Skip to content

Commit

Permalink
Merge branch 'vectordotdev:master' into nji/cgroup-mem-stat
Browse files Browse the repository at this point in the history
  • Loading branch information
nionata authored Jan 9, 2025
2 parents e30a061 + 68ee4f2 commit 11cd4e9
Show file tree
Hide file tree
Showing 38 changed files with 299 additions and 58 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Changes the fingerprint for file sources to use uncompressed file content
as a source of truth when fingerprinting lines and checking
`ignored_header_bytes`. Previously this was using the compressed bytes. For now, only gzip compression is supported.

authors: roykim98
265 changes: 257 additions & 8 deletions lib/file-source/src/fingerprinter.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::{
collections::HashSet,
fs::{self, metadata, File},
io::{self, Read, Seek, SeekFrom, Write},
io::{self, BufRead, BufReader, Read, Seek, SeekFrom, Write},
path::{Path, PathBuf},
};

use crc::Crc;
use flate2::bufread::GzDecoder;
use serde::{Deserialize, Serialize};
use vector_common::constants::GZIP_MAGIC;

use crate::{metadata_ext::PortableFileExt, FileSourceInternalEvents};

Expand Down Expand Up @@ -69,6 +71,95 @@ impl From<u64> for FileFingerprint {
}
}

#[derive(Debug, Copy, Clone)]
enum SupportedCompressionAlgorithms {
Gzip,
}

impl SupportedCompressionAlgorithms {
fn values() -> Vec<SupportedCompressionAlgorithms> {
// Enumerate these from smallest magic_header_bytes to largest
vec![SupportedCompressionAlgorithms::Gzip]
}

fn magic_header_bytes(&self) -> &'static [u8] {
match self {
SupportedCompressionAlgorithms::Gzip => GZIP_MAGIC,
}
}
}

trait UncompressedReader {
fn check(fp: &mut File) -> Result<Option<SupportedCompressionAlgorithms>, std::io::Error>;
fn reader<'a>(fp: &'a mut File) -> Result<Box<dyn BufRead + 'a>, std::io::Error>;
}

struct UncompressedReaderImpl;
impl UncompressedReader for UncompressedReaderImpl {
/// Checks a file for supported compression algorithms by searching for
/// supported magic header bytes.
///
/// If an error occurs during reading, the file handler may become unusable,
/// as the cursor position of the file may not be reset.
///
/// # Arguments
/// - `fp`: A mutable reference to the file to check.
///
/// # Returns
/// - `Ok(Some(algorithm))` if a supported compression algorithm is detected.
/// - `Ok(None)` if no supported compression algorithm is detected.
/// - `Err(std::io::Error)` if an I/O error occurs.
fn check(fp: &mut File) -> Result<Option<SupportedCompressionAlgorithms>, std::io::Error> {
let mut algorithm: Option<SupportedCompressionAlgorithms> = None;
for compression_algorithm in SupportedCompressionAlgorithms::values() {
// magic headers for algorithms can be of different lengths, and using a buffer too long could exceed the length of the file
// so instantiate and check the various sizes in monotonically increasing order
let magic_header_bytes = compression_algorithm.magic_header_bytes();

let mut magic = vec![0u8; magic_header_bytes.len()];

fp.seek(SeekFrom::Start(0))?;
let result = fp.read_exact(&mut magic);

if result.is_err() {
fp.seek(SeekFrom::Start(0))?;
return Err(result.unwrap_err());
}

if magic == magic_header_bytes {
algorithm = Some(compression_algorithm);
break;
}
}
fp.seek(SeekFrom::Start(0))?;
Ok(algorithm)
}

fn reader<'a>(fp: &'a mut File) -> Result<Box<dyn BufRead + 'a>, std::io::Error> {
// To support new compression algorithms, add them below
match Self::check(fp)? {
Some(SupportedCompressionAlgorithms::Gzip) => {
Ok(Box::new(BufReader::new(GzDecoder::new(BufReader::new(fp)))))
}
// No compression, or read the raw bytes
None => Ok(Box::new(BufReader::new(fp))),
}
}
}

fn skip_first_n_bytes<R: BufRead>(reader: &mut R, n: usize) -> io::Result<()> {
// We cannot simply seek the file by n because the file may be compressed;
// to skip the first n decompressed bytes, we decompress up to n and discard the output.
let mut skipped_bytes = 0;
while skipped_bytes < n {
let chunk = reader.fill_buf()?;
let bytes_to_skip = std::cmp::min(chunk.len(), n - skipped_bytes);
reader.consume(bytes_to_skip);
skipped_bytes += bytes_to_skip;
}
Ok(())
}

impl Fingerprinter {
pub fn get_fingerprint_of_file(
&self,
Expand All @@ -95,8 +186,10 @@ impl Fingerprinter {
} => {
buffer.resize(self.max_line_length, 0u8);
let mut fp = fs::File::open(path)?;
fp.seek(SeekFrom::Start(ignored_header_bytes as u64))?;
let bytes_read = fingerprinter_read_until(fp, b'\n', lines, buffer)?;
let mut reader = UncompressedReaderImpl::reader(&mut fp)?;

skip_first_n_bytes(&mut reader, ignored_header_bytes)?;
let bytes_read = fingerprinter_read_until(reader, b'\n', lines, buffer)?;
let fingerprint = FINGERPRINT_CRC.checksum(&buffer[..bytes_read]);
Ok(FirstLinesChecksum(fingerprint))
}
Expand Down Expand Up @@ -281,12 +374,37 @@ fn fingerprinter_read_until(

#[cfg(test)]
mod test {
use std::{collections::HashSet, fs, io::Error, path::Path, time::Duration};
use std::{
collections::HashSet,
fs,
io::{Error, Read, Write},
path::Path,
time::Duration,
};

use tempfile::tempdir;
use flate2::write::GzEncoder;
use tempfile::{tempdir, TempDir};

use super::{FileSourceInternalEvents, FingerprintStrategy, Fingerprinter};

fn gzip(data: &mut [u8]) -> Vec<u8> {
let mut buffer = vec![];
let mut encoder = GzEncoder::new(&mut buffer, flate2::Compression::default());
encoder.write_all(data).expect("Failed to write data");
encoder
.finish()
.expect("Failed to finish encoding with gzip footer");
buffer
}

fn read_byte_content(target_dir: &TempDir, file: &str) -> Vec<u8> {
let path = target_dir.path().join(file);
let mut file = fs::File::open(path).unwrap();
let mut content = Vec::new();
file.read_to_end(&mut content).unwrap();
content
}

#[test]
fn test_checksum_fingerprint() {
let fingerprinter = Fingerprinter {
Expand Down Expand Up @@ -366,10 +484,21 @@ mod test {

let empty = prepare_test("empty.log", b"");
let incomplete_line = prepare_test("incomplete_line.log", b"missing newline char");
let one_line = prepare_test("one_line.log", b"hello world\n");
let one_line = prepare_test(
"one_line_duplicate_compressed.log",
&gzip(&mut b"hello world\n".to_vec()),
);
let one_line_duplicate = prepare_test("one_line_duplicate.log", b"hello world\n");
let one_line_duplicate_compressed = prepare_test(
"one_line_duplicate_compressed.log",
&gzip(&mut b"hello world\n".to_vec()),
);
let one_line_continued =
prepare_test("one_line_continued.log", b"hello world\nthe next line\n");
let one_line_continued_compressed = prepare_test(
"one_line_continued_compressed.log",
&gzip(&mut b"hello world\nthe next line\n".to_vec()),
);
let different_two_lines = prepare_test("different_two_lines.log", b"line one\nline two\n");

let exactly_max_line_length =
Expand All @@ -395,15 +524,39 @@ mod test {
assert!(run(&exactly_max_line_length).is_ok());
assert!(run(&exceeding_max_line_length).is_ok());

assert_eq!(run(&one_line).unwrap(), run(&one_line_duplicate).unwrap());
assert_eq!(run(&one_line).unwrap(), run(&one_line_continued).unwrap());
assert_eq!(
run(&one_line).unwrap(),
run(&one_line_duplicate_compressed).unwrap()
);
assert_eq!(
run(&one_line).unwrap(),
run(&one_line_continued_compressed).unwrap()
);
assert_eq!(
run(&one_line).unwrap(),
run(&one_line_duplicate_compressed).unwrap()
);
assert_eq!(
run(&one_line).unwrap(),
run(&one_line_continued_compressed).unwrap()
);

assert_ne!(run(&one_line).unwrap(), run(&different_two_lines).unwrap());

assert_eq!(
run(&exactly_max_line_length).unwrap(),
run(&exceeding_max_line_length).unwrap()
);

assert_ne!(
read_byte_content(&target_dir, "one_line_duplicate.log"),
read_byte_content(&target_dir, "one_line_duplicate_compressed.log")
);

assert_ne!(
read_byte_content(&target_dir, "one_line_continued.log"),
read_byte_content(&target_dir, "one_line_continued_compressed.log")
);
}

#[test]
Expand Down Expand Up @@ -436,6 +589,15 @@ mod test {
"two_lines_continued.log",
b"hello world\nfrom vector\nthe next line\n",
);
let two_lines_duplicate_compressed = prepare_test(
"two_lines_duplicate_compressed.log",
&gzip(&mut b"hello world\nfrom vector\n".to_vec()),
);
let two_lines_continued_compressed = prepare_test(
"two_lines_continued_compressed.log",
&gzip(&mut b"hello world\nfrom vector\nthe next line\n".to_vec()),
);

let different_three_lines = prepare_test(
"different_three_lines.log",
b"line one\nline two\nine three\n",
Expand All @@ -453,11 +615,82 @@ mod test {

assert_eq!(run(&two_lines).unwrap(), run(&two_lines_duplicate).unwrap());
assert_eq!(run(&two_lines).unwrap(), run(&two_lines_continued).unwrap());
assert_eq!(
run(&two_lines).unwrap(),
run(&two_lines_duplicate_compressed).unwrap()
);
assert_eq!(
run(&two_lines).unwrap(),
run(&two_lines_continued_compressed).unwrap()
);

assert_ne!(
run(&two_lines).unwrap(),
run(&different_three_lines).unwrap()
);

assert_ne!(
read_byte_content(&target_dir, "two_lines_duplicate.log"),
read_byte_content(&target_dir, "two_lines_duplicate_compressed.log")
);
assert_ne!(
read_byte_content(&target_dir, "two_lines_continued.log"),
read_byte_content(&target_dir, "two_lines_continued_compressed.log")
);
}

#[test]
fn test_first_two_lines_checksum_fingerprint_with_headers() {
let max_line_length = 64;
let fingerprinter = Fingerprinter {
strategy: FingerprintStrategy::FirstLinesChecksum {
ignored_header_bytes: 14,
lines: 2,
},
max_line_length,
ignore_not_found: false,
};

let target_dir = tempdir().unwrap();
let prepare_test = |file: &str, contents: &[u8]| {
let path = target_dir.path().join(file);
fs::write(&path, contents).unwrap();
path
};

let two_lines = prepare_test(
"two_lines.log",
b"some-header-1\nhello world\nfrom vector\n",
);
let two_lines_compressed_same_header = prepare_test(
"two_lines_compressed_same_header.log",
&gzip(&mut b"some-header-1\nhello world\nfrom vector\n".to_vec()),
);
let two_lines_compressed_same_header_size = prepare_test(
"two_lines_compressed_same_header_size.log",
&gzip(&mut b"some-header-2\nhello world\nfrom vector\n".to_vec()),
);
let two_lines_compressed_different_header_size = prepare_test(
"two_lines_compressed_different_header_size.log",
&gzip(&mut b"some-header-22\nhellow world\nfrom vector\n".to_vec()),
);

let mut buf = Vec::new();
let mut run = move |path| fingerprinter.get_fingerprint_of_file(path, &mut buf);

assert!(run(&two_lines).is_ok());
assert_eq!(
run(&two_lines).unwrap(),
run(&two_lines_compressed_same_header).unwrap()
);
assert_eq!(
run(&two_lines).unwrap(),
run(&two_lines_compressed_same_header_size).unwrap()
);
assert_ne!(
run(&two_lines).unwrap(),
run(&two_lines_compressed_different_header_size).unwrap()
);
}

#[test]
Expand Down Expand Up @@ -517,6 +750,22 @@ mod test {
.is_none());
}

#[test]
fn test_monotonic_compression_algorithms() {
// This test is necessary to handle an edge case where when assessing the magic header
// bytes of a file to determine the compression algorithm, it's possible that the length of
// the file is smaller than the size of the magic header bytes it's being assessed against.
// While this could be an indication that the file is simply too small, it could also
// just be that the compression header is a smaller one than the assessed algorithm.
// Checking this with a guarantee on the monotonically increasing order assures that this edge case doesn't happen.
let algos = super::SupportedCompressionAlgorithms::values();
let mut smallest_byte_length = 0;
for algo in algos {
let magic_header_bytes = algo.magic_header_bytes();
assert!(smallest_byte_length <= magic_header_bytes.len());
smallest_byte_length = magic_header_bytes.len();
}
}
#[derive(Clone)]
struct NoErrors;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ target:
name: vector
command: /usr/bin/vector
cpu_allotment: 7
memory_allotment: 30g
memory_allotment: 8GiB

environment:
VECTOR_THREADS: 4
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ target:
name: vector
command: /usr/bin/vector
cpu_allotment: 7
memory_allotment: 30g
memory_allotment: 8GiB

environment:
VECTOR_THREADS: 4
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ target:
name: vector
command: /usr/bin/vector
cpu_allotment: 7
memory_allotment: 30g
memory_allotment: 8GiB

environment:
VECTOR_THREADS: 4
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ target:
name: vector
command: /usr/bin/vector
cpu_allotment: 7
memory_allotment: 30g
memory_allotment: 8GiB

environment:
VECTOR_THREADS: 4
1 change: 0 additions & 1 deletion regression/cases/file_to_blackhole/data/.gitkeep
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@

Loading

0 comments on commit 11cd4e9

Please sign in to comment.