Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(file source)!: use uncompressed content for fingerprinting files (lines and ignored_header_bytes) #22050

Merged
merged 15 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Changes the fingerprint for file sources to use uncompressed file content
as a source of truth when fingerprinting lines and checking
`ignored_header_bytes`. Previously this was using the compressed bytes. For now, only gzip compression is supported.

authors: roykim98
264 changes: 256 additions & 8 deletions lib/file-source/src/fingerprinter.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::{
collections::HashSet,
fs::{self, metadata, File},
io::{self, Read, Seek, SeekFrom, Write},
io::{self, BufRead, BufReader, Read, Seek, SeekFrom, Write},
path::{Path, PathBuf},
};

use crc::Crc;
use flate2::bufread::GzDecoder;
use serde::{Deserialize, Serialize};
use vector_common::constants::GZIP_MAGIC;

use crate::{metadata_ext::PortableFileExt, FileSourceInternalEvents};

Expand Down Expand Up @@ -69,6 +71,94 @@ impl From<u64> for FileFingerprint {
}
}

#[derive(Debug, Copy, Clone)]
enum SupportedCompressionAlgorithms {
Gzip,
}

impl SupportedCompressionAlgorithms {
fn values() -> Vec<SupportedCompressionAlgorithms> {
// Enumerate these from smallest magic_header_bytes to largest
vec![SupportedCompressionAlgorithms::Gzip]
}

fn magic_header_bytes(&self) -> &'static [u8] {
match self {
SupportedCompressionAlgorithms::Gzip => GZIP_MAGIC,
}
}
}

trait UncompressedReader {
fn check(fp: &mut File) -> Result<Option<SupportedCompressionAlgorithms>, std::io::Error>;
fn reader<'a>(fp: &'a mut File) -> Result<Box<dyn BufRead + 'a>, std::io::Error>;
}

struct UncompressedReaderImpl;
impl UncompressedReader for UncompressedReaderImpl {
/// Checks a file for supported compression algorithms by searching for
/// supported magic header bytes.
///
/// If an error occurs during reading, the file handler may become unusable,
/// as the cursor position of the file may not be reset.
///
/// # Arguments
/// - `fp`: A mutable reference to the file to check.
///
/// # Returns
/// - `Ok(Some(algorithm))` if a supported compression algorithm is detected.
/// - `Ok(None)` if no supported compression algorithm is detected.
/// - `Err(std::io::Error)` if an I/O error occurs.
fn check(fp: &mut File) -> Result<Option<SupportedCompressionAlgorithms>, std::io::Error> {
let mut algorithm: Option<SupportedCompressionAlgorithms> = None;
for compression_algorithm in SupportedCompressionAlgorithms::values() {
// magic headers for algorithms can be of different lengths, and using a buffer too long could exceed the length of the file
// so instantiate and check the various sizes in monotonically increasing order
let magic_header_bytes = compression_algorithm.magic_header_bytes();

let mut magic = vec![0u8; magic_header_bytes.len()];

fp.seek(SeekFrom::Start(0))?;
let result = fp.read_exact(&mut magic);

if result.is_err() {
fp.seek(SeekFrom::Start(0))?;
return Err(result.unwrap_err());
}

if magic == magic_header_bytes {
algorithm = Some(compression_algorithm);
break;
}
}
fp.seek(SeekFrom::Start(0))?;
Ok(algorithm)
}
fn reader<'a>(fp: &'a mut File) -> Result<Box<dyn BufRead + 'a>, std::io::Error> {
pront marked this conversation as resolved.
Show resolved Hide resolved
// To support new compression algorithms, add them below
match Self::check(fp)? {
Some(SupportedCompressionAlgorithms::Gzip) => {
Ok(Box::new(BufReader::new(GzDecoder::new(BufReader::new(fp)))))
}
// No compression, or read the raw bytes
None => Ok(Box::new(BufReader::new(fp))),
}
}
}

fn skip_first_n_bytes<R: BufRead>(reader: &mut R, n: usize) -> io::Result<()> {
// We cannot simply seek the file by n because the file may be compressed;
// to skip the first n decompressed bytes, we decompress up to n and discard the output.
let mut skipped_bytes = 0;
while skipped_bytes < n {
let chunk = reader.fill_buf()?;
let bytes_to_skip = std::cmp::min(chunk.len(), n - skipped_bytes);
reader.consume(bytes_to_skip);
skipped_bytes += bytes_to_skip;
}
Ok(())
}

impl Fingerprinter {
pub fn get_fingerprint_of_file(
&self,
Expand All @@ -95,8 +185,10 @@ impl Fingerprinter {
} => {
buffer.resize(self.max_line_length, 0u8);
let mut fp = fs::File::open(path)?;
fp.seek(SeekFrom::Start(ignored_header_bytes as u64))?;
let bytes_read = fingerprinter_read_until(fp, b'\n', lines, buffer)?;
let mut reader = UncompressedReaderImpl::reader(&mut fp)?;

skip_first_n_bytes(&mut reader, ignored_header_bytes)?;
let bytes_read = fingerprinter_read_until(reader, b'\n', lines, buffer)?;
let fingerprint = FINGERPRINT_CRC.checksum(&buffer[..bytes_read]);
Ok(FirstLinesChecksum(fingerprint))
}
Expand Down Expand Up @@ -281,12 +373,37 @@ fn fingerprinter_read_until(

#[cfg(test)]
mod test {
use std::{collections::HashSet, fs, io::Error, path::Path, time::Duration};
use std::{
collections::HashSet,
fs,
io::{Error, Read, Write},
path::Path,
time::Duration,
};

use tempfile::tempdir;
use flate2::write::GzEncoder;
use tempfile::{tempdir, TempDir};

use super::{FileSourceInternalEvents, FingerprintStrategy, Fingerprinter};

fn gzip(data: &mut [u8]) -> Vec<u8> {
let mut buffer = vec![];
let mut encoder = GzEncoder::new(&mut buffer, flate2::Compression::default());
encoder.write_all(data).expect("Failed to write data");
encoder
.finish()
.expect("Failed to finish encoding with gzip footer");
buffer
}

fn read_byte_content(target_dir: &TempDir, file: &str) -> Vec<u8> {
let path = target_dir.path().join(file);
let mut file = fs::File::open(path).unwrap();
let mut content = Vec::new();
file.read_to_end(&mut content).unwrap();
content
}

#[test]
fn test_checksum_fingerprint() {
let fingerprinter = Fingerprinter {
Expand Down Expand Up @@ -366,10 +483,21 @@ mod test {

let empty = prepare_test("empty.log", b"");
let incomplete_line = prepare_test("incomplete_line.log", b"missing newline char");
let one_line = prepare_test("one_line.log", b"hello world\n");
let one_line = prepare_test(
"one_line_duplicate_compressed.log",
&gzip(&mut b"hello world\n".to_vec()),
);
let one_line_duplicate = prepare_test("one_line_duplicate.log", b"hello world\n");
let one_line_duplicate_compressed = prepare_test(
"one_line_duplicate_compressed.log",
&gzip(&mut b"hello world\n".to_vec()),
);
let one_line_continued =
prepare_test("one_line_continued.log", b"hello world\nthe next line\n");
let one_line_continued_compressed = prepare_test(
"one_line_continued_compressed.log",
&gzip(&mut b"hello world\nthe next line\n".to_vec()),
);
let different_two_lines = prepare_test("different_two_lines.log", b"line one\nline two\n");

let exactly_max_line_length =
Expand All @@ -395,15 +523,39 @@ mod test {
assert!(run(&exactly_max_line_length).is_ok());
assert!(run(&exceeding_max_line_length).is_ok());

assert_eq!(run(&one_line).unwrap(), run(&one_line_duplicate).unwrap());
assert_eq!(run(&one_line).unwrap(), run(&one_line_continued).unwrap());
assert_eq!(
run(&one_line).unwrap(),
run(&one_line_duplicate_compressed).unwrap()
);
assert_eq!(
run(&one_line).unwrap(),
run(&one_line_continued_compressed).unwrap()
);
assert_eq!(
run(&one_line).unwrap(),
run(&one_line_duplicate_compressed).unwrap()
);
assert_eq!(
run(&one_line).unwrap(),
run(&one_line_continued_compressed).unwrap()
);

assert_ne!(run(&one_line).unwrap(), run(&different_two_lines).unwrap());

assert_eq!(
run(&exactly_max_line_length).unwrap(),
run(&exceeding_max_line_length).unwrap()
);

assert_ne!(
read_byte_content(&target_dir, "one_line_duplicate.log"),
read_byte_content(&target_dir, "one_line_duplicate_compressed.log")
);

assert_ne!(
read_byte_content(&target_dir, "one_line_continued.log"),
read_byte_content(&target_dir, "one_line_continued_compressed.log")
);
}

#[test]
Expand Down Expand Up @@ -436,6 +588,15 @@ mod test {
"two_lines_continued.log",
b"hello world\nfrom vector\nthe next line\n",
);
let two_lines_duplicate_compressed = prepare_test(
"two_lines_duplicate_compressed.log",
&gzip(&mut b"hello world\nfrom vector\n".to_vec()),
);
let two_lines_continued_compressed = prepare_test(
"two_lines_continued_compressed.log",
&gzip(&mut b"hello world\nfrom vector\nthe next line\n".to_vec()),
);

let different_three_lines = prepare_test(
"different_three_lines.log",
b"line one\nline two\nine three\n",
Expand All @@ -453,11 +614,82 @@ mod test {

assert_eq!(run(&two_lines).unwrap(), run(&two_lines_duplicate).unwrap());
assert_eq!(run(&two_lines).unwrap(), run(&two_lines_continued).unwrap());
assert_eq!(
run(&two_lines).unwrap(),
run(&two_lines_duplicate_compressed).unwrap()
);
assert_eq!(
run(&two_lines).unwrap(),
run(&two_lines_continued_compressed).unwrap()
);

assert_ne!(
run(&two_lines).unwrap(),
run(&different_three_lines).unwrap()
);

assert_ne!(
read_byte_content(&target_dir, "two_lines_duplicate.log"),
read_byte_content(&target_dir, "two_lines_duplicate_compressed.log")
);
assert_ne!(
read_byte_content(&target_dir, "two_lines_continued.log"),
read_byte_content(&target_dir, "two_lines_continued_compressed.log")
);
}

#[test]
fn test_first_two_lines_checksum_fingerprint_with_headers() {
let max_line_length = 64;
let fingerprinter = Fingerprinter {
strategy: FingerprintStrategy::FirstLinesChecksum {
ignored_header_bytes: 14,
lines: 2,
},
max_line_length,
ignore_not_found: false,
};

let target_dir = tempdir().unwrap();
let prepare_test = |file: &str, contents: &[u8]| {
let path = target_dir.path().join(file);
fs::write(&path, contents).unwrap();
path
};

let two_lines = prepare_test(
"two_lines.log",
b"some-header-1\nhello world\nfrom vector\n",
);
let two_lines_compressed_same_header = prepare_test(
"two_lines_compressed_same_header.log",
&gzip(&mut b"some-header-1\nhello world\nfrom vector\n".to_vec()),
);
let two_lines_compressed_same_header_size = prepare_test(
"two_lines_compressed_same_header_size.log",
&gzip(&mut b"some-header-2\nhello world\nfrom vector\n".to_vec()),
);
let two_lines_compressed_different_header_size = prepare_test(
"two_lines_compressed_different_header_size.log",
&gzip(&mut b"some-header-22\nhellow world\nfrom vector\n".to_vec()),
);

let mut buf = Vec::new();
let mut run = move |path| fingerprinter.get_fingerprint_of_file(path, &mut buf);

assert!(run(&two_lines).is_ok());
assert_eq!(
run(&two_lines).unwrap(),
run(&two_lines_compressed_same_header).unwrap()
);
assert_eq!(
run(&two_lines).unwrap(),
run(&two_lines_compressed_same_header_size).unwrap()
);
assert_ne!(
run(&two_lines).unwrap(),
run(&two_lines_compressed_different_header_size).unwrap()
);
}

#[test]
Expand Down Expand Up @@ -517,6 +749,22 @@ mod test {
.is_none());
}

#[test]
fn test_monotonic_compression_algorithms() {
// This test is necessary to handle an edge case where when assessing the magic header
// bytes of a file to determine the compression algorithm, it's possible that the length of
// the file is smaller than the size of the magic header bytes it's being assessed against.
// While this could be an indication that the file is simply too small, it could also
// just be that the compression header is a smaller one than the assessed algorithm.
// Checking this with a guarantee on the monotonically increasing order assures that this edge case doesn't happen.
let algos = super::SupportedCompressionAlgorithms::values();
let mut smallest_byte_length = 0;
for algo in algos {
let magic_header_bytes = algo.magic_header_bytes();
assert!(smallest_byte_length <= magic_header_bytes.len());
smallest_byte_length = magic_header_bytes.len();
}
}
#[derive(Clone)]
struct NoErrors;

Expand Down
5 changes: 4 additions & 1 deletion src/sources/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ pub enum FingerprintConfig {
bytes: Option<usize>,

/// The number of bytes to skip ahead (or ignore) when reading the data used for generating the checksum.
/// If the file is compressed, the number of bytes refer to the header in the uncompressed content. Only
/// gzip is supported at this time.
///
/// This can be helpful if all files share a common header that should be skipped.
#[serde(default = "default_ignored_header_bytes")]
Expand All @@ -306,7 +308,8 @@ pub enum FingerprintConfig {

/// The number of lines to read for generating the checksum.
///
/// If your files share a common header that is not always a fixed size,
/// The number of lines are determined from the uncompressed content if the file is compressed. Only
/// gzip is supported at this time.
///
/// If the file has less than this amount of lines, it won’t be read at all.
#[serde(default = "default_lines")]
Expand Down
5 changes: 4 additions & 1 deletion website/cue/reference/components/sources/base/file.cue
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ base: components: sources: file: configuration: {
ignored_header_bytes: {
description: """
The number of bytes to skip ahead (or ignore) when reading the data used for generating the checksum.
If the file is compressed, the number of bytes refer to the header in the uncompressed content. Only
gzip is supported at this time.

This can be helpful if all files share a common header that should be skipped.
"""
Expand All @@ -112,7 +114,8 @@ base: components: sources: file: configuration: {
description: """
The number of lines to read for generating the checksum.

If your files share a common header that is not always a fixed size,
The number of lines are determined from the uncompressed content if the file is compressed. Only
gzip is supported at this time.

If the file has less than this amount of lines, it won’t be read at all.
"""
Expand Down
Loading
Loading