Skip to content

Commit

Permalink
fix(file source)!: use uncompressed content for fingerprinting files …
Browse files Browse the repository at this point in the history
…(lines and ignored_header_bytes) (#22050)

* Check the crc of uncompressed lines when reading from compressed gzip files

* Update documentation

* Handle header bytes properly and add a test

* Use fingerprint instead of fingerprinter

* Clarify that only gzip is supported at this time

* Propagate errors upward as alluded to in @jszwedko comments

* Refactor check

Signed-off-by: Jesse Szwedko <jesse.szwedko@datadoghq.com>

* Have to seek after finding algorithm too

Signed-off-by: Jesse Szwedko <jesse.szwedko@datadoghq.com>

* Remove whitespace

* Clippy

* make check-component-docs

* Attempt to reset the position and also add a doc comment

* Update changelog.d/22050-fingerprint-uncompressed-file-content.fix.md

Co-authored-by: Pavlos Rontidis <pavlos.rontidis@gmail.com>

* Use GZIP_MAGIC

* Update lib/file-source/src/fingerprinter.rs

---------

Signed-off-by: Jesse Szwedko <jesse.szwedko@datadoghq.com>
Co-authored-by: Jesse Szwedko <jesse.szwedko@datadoghq.com>
Co-authored-by: Pavlos Rontidis <pavlos.rontidis@gmail.com>
  • Loading branch information
3 people authored Jan 9, 2025
1 parent 45cd651 commit 68ee4f2
Show file tree
Hide file tree
Showing 5 changed files with 272 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Changes the fingerprint for file sources to use uncompressed file content
as a source of truth when fingerprinting lines and checking
`ignored_header_bytes`. Previously this was using the compressed bytes. For now, only gzip compression is supported.

authors: roykim98
265 changes: 257 additions & 8 deletions lib/file-source/src/fingerprinter.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::{
collections::HashSet,
fs::{self, metadata, File},
io::{self, Read, Seek, SeekFrom, Write},
io::{self, BufRead, BufReader, Read, Seek, SeekFrom, Write},
path::{Path, PathBuf},
};

use crc::Crc;
use flate2::bufread::GzDecoder;
use serde::{Deserialize, Serialize};
use vector_common::constants::GZIP_MAGIC;

use crate::{metadata_ext::PortableFileExt, FileSourceInternalEvents};

Expand Down Expand Up @@ -69,6 +71,95 @@ impl From<u64> for FileFingerprint {
}
}

#[derive(Debug, Copy, Clone)]
enum SupportedCompressionAlgorithms {
Gzip,
}

impl SupportedCompressionAlgorithms {
fn values() -> Vec<SupportedCompressionAlgorithms> {
// Enumerate these from smallest magic_header_bytes to largest
vec![SupportedCompressionAlgorithms::Gzip]
}

fn magic_header_bytes(&self) -> &'static [u8] {
match self {
SupportedCompressionAlgorithms::Gzip => GZIP_MAGIC,
}
}
}

trait UncompressedReader {
fn check(fp: &mut File) -> Result<Option<SupportedCompressionAlgorithms>, std::io::Error>;
fn reader<'a>(fp: &'a mut File) -> Result<Box<dyn BufRead + 'a>, std::io::Error>;
}

struct UncompressedReaderImpl;
impl UncompressedReader for UncompressedReaderImpl {
/// Checks a file for supported compression algorithms by searching for
/// supported magic header bytes.
///
/// If an error occurs during reading, the file handler may become unusable,
/// as the cursor position of the file may not be reset.
///
/// # Arguments
/// - `fp`: A mutable reference to the file to check.
///
/// # Returns
/// - `Ok(Some(algorithm))` if a supported compression algorithm is detected.
/// - `Ok(None)` if no supported compression algorithm is detected.
/// - `Err(std::io::Error)` if an I/O error occurs.
fn check(fp: &mut File) -> Result<Option<SupportedCompressionAlgorithms>, std::io::Error> {
let mut algorithm: Option<SupportedCompressionAlgorithms> = None;
for compression_algorithm in SupportedCompressionAlgorithms::values() {
// magic headers for algorithms can be of different lengths, and using a buffer too long could exceed the length of the file
// so instantiate and check the various sizes in monotonically increasing order
let magic_header_bytes = compression_algorithm.magic_header_bytes();

let mut magic = vec![0u8; magic_header_bytes.len()];

fp.seek(SeekFrom::Start(0))?;
let result = fp.read_exact(&mut magic);

if result.is_err() {
fp.seek(SeekFrom::Start(0))?;
return Err(result.unwrap_err());
}

if magic == magic_header_bytes {
algorithm = Some(compression_algorithm);
break;
}
}
fp.seek(SeekFrom::Start(0))?;
Ok(algorithm)
}

fn reader<'a>(fp: &'a mut File) -> Result<Box<dyn BufRead + 'a>, std::io::Error> {
// To support new compression algorithms, add them below
match Self::check(fp)? {
Some(SupportedCompressionAlgorithms::Gzip) => {
Ok(Box::new(BufReader::new(GzDecoder::new(BufReader::new(fp)))))
}
// No compression, or read the raw bytes
None => Ok(Box::new(BufReader::new(fp))),
}
}
}

fn skip_first_n_bytes<R: BufRead>(reader: &mut R, n: usize) -> io::Result<()> {
// We cannot simply seek the file by n because the file may be compressed;
// to skip the first n decompressed bytes, we decompress up to n and discard the output.
let mut skipped_bytes = 0;
while skipped_bytes < n {
let chunk = reader.fill_buf()?;
let bytes_to_skip = std::cmp::min(chunk.len(), n - skipped_bytes);
reader.consume(bytes_to_skip);
skipped_bytes += bytes_to_skip;
}
Ok(())
}

impl Fingerprinter {
pub fn get_fingerprint_of_file(
&self,
Expand All @@ -95,8 +186,10 @@ impl Fingerprinter {
} => {
buffer.resize(self.max_line_length, 0u8);
let mut fp = fs::File::open(path)?;
fp.seek(SeekFrom::Start(ignored_header_bytes as u64))?;
let bytes_read = fingerprinter_read_until(fp, b'\n', lines, buffer)?;
let mut reader = UncompressedReaderImpl::reader(&mut fp)?;

skip_first_n_bytes(&mut reader, ignored_header_bytes)?;
let bytes_read = fingerprinter_read_until(reader, b'\n', lines, buffer)?;
let fingerprint = FINGERPRINT_CRC.checksum(&buffer[..bytes_read]);
Ok(FirstLinesChecksum(fingerprint))
}
Expand Down Expand Up @@ -281,12 +374,37 @@ fn fingerprinter_read_until(

#[cfg(test)]
mod test {
use std::{collections::HashSet, fs, io::Error, path::Path, time::Duration};
use std::{
collections::HashSet,
fs,
io::{Error, Read, Write},
path::Path,
time::Duration,
};

use tempfile::tempdir;
use flate2::write::GzEncoder;
use tempfile::{tempdir, TempDir};

use super::{FileSourceInternalEvents, FingerprintStrategy, Fingerprinter};

fn gzip(data: &mut [u8]) -> Vec<u8> {
let mut buffer = vec![];
let mut encoder = GzEncoder::new(&mut buffer, flate2::Compression::default());
encoder.write_all(data).expect("Failed to write data");
encoder
.finish()
.expect("Failed to finish encoding with gzip footer");
buffer
}

fn read_byte_content(target_dir: &TempDir, file: &str) -> Vec<u8> {
let path = target_dir.path().join(file);
let mut file = fs::File::open(path).unwrap();
let mut content = Vec::new();
file.read_to_end(&mut content).unwrap();
content
}

#[test]
fn test_checksum_fingerprint() {
let fingerprinter = Fingerprinter {
Expand Down Expand Up @@ -366,10 +484,21 @@ mod test {

let empty = prepare_test("empty.log", b"");
let incomplete_line = prepare_test("incomplete_line.log", b"missing newline char");
let one_line = prepare_test("one_line.log", b"hello world\n");
let one_line = prepare_test(
"one_line_duplicate_compressed.log",
&gzip(&mut b"hello world\n".to_vec()),
);
let one_line_duplicate = prepare_test("one_line_duplicate.log", b"hello world\n");
let one_line_duplicate_compressed = prepare_test(
"one_line_duplicate_compressed.log",
&gzip(&mut b"hello world\n".to_vec()),
);
let one_line_continued =
prepare_test("one_line_continued.log", b"hello world\nthe next line\n");
let one_line_continued_compressed = prepare_test(
"one_line_continued_compressed.log",
&gzip(&mut b"hello world\nthe next line\n".to_vec()),
);
let different_two_lines = prepare_test("different_two_lines.log", b"line one\nline two\n");

let exactly_max_line_length =
Expand All @@ -395,15 +524,39 @@ mod test {
assert!(run(&exactly_max_line_length).is_ok());
assert!(run(&exceeding_max_line_length).is_ok());

assert_eq!(run(&one_line).unwrap(), run(&one_line_duplicate).unwrap());
assert_eq!(run(&one_line).unwrap(), run(&one_line_continued).unwrap());
assert_eq!(
run(&one_line).unwrap(),
run(&one_line_duplicate_compressed).unwrap()
);
assert_eq!(
run(&one_line).unwrap(),
run(&one_line_continued_compressed).unwrap()
);
assert_eq!(
run(&one_line).unwrap(),
run(&one_line_duplicate_compressed).unwrap()
);
assert_eq!(
run(&one_line).unwrap(),
run(&one_line_continued_compressed).unwrap()
);

assert_ne!(run(&one_line).unwrap(), run(&different_two_lines).unwrap());

assert_eq!(
run(&exactly_max_line_length).unwrap(),
run(&exceeding_max_line_length).unwrap()
);

assert_ne!(
read_byte_content(&target_dir, "one_line_duplicate.log"),
read_byte_content(&target_dir, "one_line_duplicate_compressed.log")
);

assert_ne!(
read_byte_content(&target_dir, "one_line_continued.log"),
read_byte_content(&target_dir, "one_line_continued_compressed.log")
);
}

#[test]
Expand Down Expand Up @@ -436,6 +589,15 @@ mod test {
"two_lines_continued.log",
b"hello world\nfrom vector\nthe next line\n",
);
let two_lines_duplicate_compressed = prepare_test(
"two_lines_duplicate_compressed.log",
&gzip(&mut b"hello world\nfrom vector\n".to_vec()),
);
let two_lines_continued_compressed = prepare_test(
"two_lines_continued_compressed.log",
&gzip(&mut b"hello world\nfrom vector\nthe next line\n".to_vec()),
);

let different_three_lines = prepare_test(
"different_three_lines.log",
b"line one\nline two\nine three\n",
Expand All @@ -453,11 +615,82 @@ mod test {

assert_eq!(run(&two_lines).unwrap(), run(&two_lines_duplicate).unwrap());
assert_eq!(run(&two_lines).unwrap(), run(&two_lines_continued).unwrap());
assert_eq!(
run(&two_lines).unwrap(),
run(&two_lines_duplicate_compressed).unwrap()
);
assert_eq!(
run(&two_lines).unwrap(),
run(&two_lines_continued_compressed).unwrap()
);

assert_ne!(
run(&two_lines).unwrap(),
run(&different_three_lines).unwrap()
);

assert_ne!(
read_byte_content(&target_dir, "two_lines_duplicate.log"),
read_byte_content(&target_dir, "two_lines_duplicate_compressed.log")
);
assert_ne!(
read_byte_content(&target_dir, "two_lines_continued.log"),
read_byte_content(&target_dir, "two_lines_continued_compressed.log")
);
}

#[test]
fn test_first_two_lines_checksum_fingerprint_with_headers() {
let max_line_length = 64;
let fingerprinter = Fingerprinter {
strategy: FingerprintStrategy::FirstLinesChecksum {
ignored_header_bytes: 14,
lines: 2,
},
max_line_length,
ignore_not_found: false,
};

let target_dir = tempdir().unwrap();
let prepare_test = |file: &str, contents: &[u8]| {
let path = target_dir.path().join(file);
fs::write(&path, contents).unwrap();
path
};

let two_lines = prepare_test(
"two_lines.log",
b"some-header-1\nhello world\nfrom vector\n",
);
let two_lines_compressed_same_header = prepare_test(
"two_lines_compressed_same_header.log",
&gzip(&mut b"some-header-1\nhello world\nfrom vector\n".to_vec()),
);
let two_lines_compressed_same_header_size = prepare_test(
"two_lines_compressed_same_header_size.log",
&gzip(&mut b"some-header-2\nhello world\nfrom vector\n".to_vec()),
);
let two_lines_compressed_different_header_size = prepare_test(
"two_lines_compressed_different_header_size.log",
&gzip(&mut b"some-header-22\nhellow world\nfrom vector\n".to_vec()),
);

let mut buf = Vec::new();
let mut run = move |path| fingerprinter.get_fingerprint_of_file(path, &mut buf);

assert!(run(&two_lines).is_ok());
assert_eq!(
run(&two_lines).unwrap(),
run(&two_lines_compressed_same_header).unwrap()
);
assert_eq!(
run(&two_lines).unwrap(),
run(&two_lines_compressed_same_header_size).unwrap()
);
assert_ne!(
run(&two_lines).unwrap(),
run(&two_lines_compressed_different_header_size).unwrap()
);
}

#[test]
Expand Down Expand Up @@ -517,6 +750,22 @@ mod test {
.is_none());
}

#[test]
fn test_monotonic_compression_algorithms() {
// This test is necessary to handle an edge case where when assessing the magic header
// bytes of a file to determine the compression algorithm, it's possible that the length of
// the file is smaller than the size of the magic header bytes it's being assessed against.
// While this could be an indication that the file is simply too small, it could also
// just be that the compression header is a smaller one than the assessed algorithm.
// Checking this with a guarantee on the monotonically increasing order assures that this edge case doesn't happen.
let algos = super::SupportedCompressionAlgorithms::values();
let mut smallest_byte_length = 0;
for algo in algos {
let magic_header_bytes = algo.magic_header_bytes();
assert!(smallest_byte_length <= magic_header_bytes.len());
smallest_byte_length = magic_header_bytes.len();
}
}
#[derive(Clone)]
struct NoErrors;

Expand Down
5 changes: 4 additions & 1 deletion src/sources/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ pub enum FingerprintConfig {
bytes: Option<usize>,

/// The number of bytes to skip ahead (or ignore) when reading the data used for generating the checksum.
/// If the file is compressed, the number of bytes refer to the header in the uncompressed content. Only
/// gzip is supported at this time.
///
/// This can be helpful if all files share a common header that should be skipped.
#[serde(default = "default_ignored_header_bytes")]
Expand All @@ -306,7 +308,8 @@ pub enum FingerprintConfig {

/// The number of lines to read for generating the checksum.
///
/// If your files share a common header that is not always a fixed size,
/// The number of lines are determined from the uncompressed content if the file is compressed. Only
/// gzip is supported at this time.
///
/// If the file has less than this amount of lines, it won’t be read at all.
#[serde(default = "default_lines")]
Expand Down
5 changes: 4 additions & 1 deletion website/cue/reference/components/sources/base/file.cue
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ base: components: sources: file: configuration: {
ignored_header_bytes: {
description: """
The number of bytes to skip ahead (or ignore) when reading the data used for generating the checksum.
If the file is compressed, the number of bytes refer to the header in the uncompressed content. Only
gzip is supported at this time.
This can be helpful if all files share a common header that should be skipped.
"""
Expand All @@ -112,7 +114,8 @@ base: components: sources: file: configuration: {
description: """
The number of lines to read for generating the checksum.
If your files share a common header that is not always a fixed size,
The number of lines are determined from the uncompressed content if the file is compressed. Only
gzip is supported at this time.
If the file has less than this amount of lines, it won’t be read at all.
"""
Expand Down
Loading

0 comments on commit 68ee4f2

Please sign in to comment.