diff --git a/CHANGELOG.md b/CHANGELOG.md index 3637805..7216522 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ In Development -------------- - Add `--compress-filter-msgs` option +- Support all documented S3 Inventory fields in inventory lists v0.1.0-alpha.2 (2025-01-06) --------------------------- diff --git a/Cargo.lock b/Cargo.lock index 4792dd1..c8568d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -714,7 +714,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" dependencies = [ "powerfmt", - "serde", ] [[package]] @@ -1771,6 +1770,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustversion" +version = "1.0.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c45b9784283f1b2e7fb61b42047c2fd678ef0960d4f6f1eba131594cc369d4" + [[package]] name = "ryu" version = "1.0.18" @@ -1803,6 +1808,7 @@ dependencies = [ "rstest", "serde", "serde_json", + "strum", "tempfile", "thiserror", "time", @@ -2015,6 +2021,28 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "strum" +version = "0.26.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" +dependencies = [ + "strum_macros", +] + +[[package]] +name = "strum_macros" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn", +] + [[package]] name = "subtle" version = "2.6.1" diff --git a/Cargo.toml b/Cargo.toml index 7dbc2ba..3da4e65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,9 +35,10 @@ percent-encoding = "2.3.1" regex = "1.11.1" serde = { version = "1.0.217", features = ["derive"] } serde_json = "1.0.134" +strum = { version = "0.26.3", features = ["derive"] } tempfile = "3.15.0" thiserror = "2.0.9" -time = { version = "0.3.37", features = ["macros", "parsing", "serde"] } +time = { version = "0.3.37", features = ["macros", "parsing"] } tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread", "signal"] } tokio-util = { version = "0.7.13", features = ["rt"] } tracing = "0.1.41" diff --git a/README.md b/README.md index d40f807..6181261 100644 --- a/README.md +++ b/README.md @@ -13,18 +13,8 @@ making use of the bucket's [Amazon S3 Inventory][inv] files. [inv]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/storage-inventory.html Currently, only versioned buckets and S3 Inventories with CSV output files are -supported, and the CSVs are required to have exactly the following fields, in -order: - -- `Bucket` -- `Key` -- `VersionId` -- `IsLatest` -- `IsDeleteMarker` -- `Size` -- `LastModifiedDate` -- `ETag` -- `IsMultipartUploaded` +supported, and the CSVs are required to list at least the `Bucket`, `Key`, +`VersionId`, and `ETag` fields. **Warning:** This is an in-development program. They may be bugs, and some planned features have not been implemented yet. diff --git a/src/inventory/fields.rs b/src/inventory/fields.rs new file mode 100644 index 0000000..d42be05 --- /dev/null +++ b/src/inventory/fields.rs @@ -0,0 +1,380 @@ +use super::item::{Directory, InventoryEntry, InventoryItem, ItemDetails}; +use crate::keypath::{KeyPath, KeyPathFromStringError}; +use serde::{ + de::{Deserializer, Unexpected}, + Deserialize, +}; +use std::collections::HashSet; +use std::fmt; +use thiserror::Error; +use time::OffsetDateTime; + +/// Fields that may be present in S3 Inventory list files +/// +/// See +/// +/// for more information on each field. +#[derive(Clone, Copy, Debug, strum::Display, strum::EnumString, Eq, Hash, PartialEq)] +pub(crate) enum InventoryField { + Bucket, + Key, + VersionId, + IsLatest, + IsDeleteMarker, + Size, + LastModifiedDate, + ETag, + IsMultipartUploaded, + StorageClass, + ReplicationStatus, + EncryptionStatus, + ObjectLockRetainUntilDate, + ObjectLockMode, + ObjectLockLegalHoldStatus, + IntelligentTieringAccessTier, + BucketKeyStatus, + ChecksumAlgorithm, + ObjectAccessControlList, + ObjectOwner, +} + +impl InventoryField { + /// `s3invsync` requires these fields to be present in every inventory list + /// file. + // IMPORTANT: If a field is ever removed from this list, the corresponding + // `if Some(field) = field else { unreachable!() };` statement in + // `FileSchema::parse_csv_fields()` must be removed as well. + const REQUIRED: [InventoryField; 4] = [ + InventoryField::Bucket, + InventoryField::Key, + InventoryField::VersionId, + InventoryField::ETag, + ]; +} + +/// A list of [`InventoryField`]s used by an inventory list file +#[derive(Clone, Debug, Eq, PartialEq)] +pub(crate) struct FileSchema { + /// The fields, in order of appearance + fields: Vec, + + /// The index in `fields` at which `InventoryField::Key` is located (for + /// convenience) + key_index: usize, +} + +impl FileSchema { + /// Given a row of strings from an inventory list CSV file, parse them into + /// an [`InventoryEntry`] according to the file schema + pub(crate) fn parse_csv_fields( + &self, + values: Vec, + ) -> Result { + let Some(key) = values.get(self.key_index) else { + return Err(ParseEntryError::NoKey); + }; + let key = percent_encoding::percent_decode_str(key) + .decode_utf8() + .map(std::borrow::Cow::into_owned) + .map_err(|_| ParseEntryError::InvalidKey(key.to_owned()))?; + let expected_len = self.fields.len(); + let actual_len = values.len(); + if expected_len != actual_len { + return Err(ParseEntryError::SizeMismatch { + key, + expected_len, + actual_len, + }); + } + let mut bucket = None; + let mut version_id = None; + let mut etag = None; + let mut is_latest = None; + let mut is_delete_marker = None; + let mut size = None; + let mut last_modified_date = None; + let mut etag_is_md5 = true; + for (&field, value) in std::iter::zip(&self.fields, values) { + match field { + InventoryField::Bucket => { + if value.is_empty() { + return Err(ParseEntryError::EmptyBucket(key)); + } + bucket = Some(value); + } + InventoryField::Key => (), + InventoryField::VersionId => { + if value.is_empty() { + return Err(ParseEntryError::EmptyVersionId(key)); + } + version_id = Some(value); + } + InventoryField::IsLatest => { + let Ok(b) = value.parse::() else { + return Err(ParseEntryError::Parse { + key, + field, + value, + expected: r#""true" or "false""#, + }); + }; + is_latest = Some(b); + } + InventoryField::IsDeleteMarker => { + let Ok(b) = value.parse::() else { + return Err(ParseEntryError::Parse { + key, + field, + value, + expected: r#""true" or "false""#, + }); + }; + is_delete_marker = Some(b); + } + InventoryField::Size => { + if !value.is_empty() { + let Ok(sz) = value.parse::() else { + return Err(ParseEntryError::Parse { + key, + field, + value, + expected: "an integer", + }); + }; + size = Some(sz); + } + } + InventoryField::LastModifiedDate => { + let Ok(ts) = OffsetDateTime::parse( + &value, + &time::format_description::well_known::Rfc3339, + ) else { + return Err(ParseEntryError::Parse { + key, + field, + value, + expected: "an ISO timestamp", + }); + }; + last_modified_date = Some(ts); + } + InventoryField::ETag => { + if !value.is_empty() { + etag = Some(value); + } + } + // TODO: If this field is absent, what can we assume about the + // etag? + InventoryField::IsMultipartUploaded => { + if value == "true" { + etag_is_md5 = false; + } + } + InventoryField::StorageClass => (), + InventoryField::ReplicationStatus => (), + InventoryField::EncryptionStatus => { + if !matches!(value.as_str(), "NOT-SSE" | "SSE-S3") { + etag_is_md5 = false; + } + } + InventoryField::ObjectLockRetainUntilDate => (), + InventoryField::ObjectLockMode => (), + InventoryField::ObjectLockLegalHoldStatus => (), + InventoryField::IntelligentTieringAccessTier => (), + InventoryField::BucketKeyStatus => (), + InventoryField::ChecksumAlgorithm => (), + InventoryField::ObjectAccessControlList => (), + InventoryField::ObjectOwner => (), + } + } + let Some(bucket) = bucket else { + unreachable!("required field 'Bucket' should always be defined"); + }; + let Some(version_id) = version_id else { + unreachable!("required field 'VersionId' should always be defined"); + }; + let is_latest = is_latest.unwrap_or(true); + if key.ends_with('/') + && (is_delete_marker == Some(true) || size.is_none() || size.is_some_and(|sz| sz == 0)) + { + return Ok(InventoryEntry::Directory(Directory { + bucket, + key, + version_id, + })); + } + let key = KeyPath::try_from(key)?; + if is_delete_marker == Some(true) { + Ok(InventoryEntry::Item(InventoryItem { + bucket, + key, + version_id, + is_latest, + last_modified_date, + details: ItemDetails::Deleted, + })) + } else { + let Some(etag) = etag else { + return Err(ParseEntryError::NoEtag(key)); + }; + Ok(InventoryEntry::Item(InventoryItem { + bucket, + key, + version_id, + is_latest, + last_modified_date, + details: ItemDetails::Present { + size, + etag, + etag_is_md5, + }, + })) + } + } +} + +impl std::str::FromStr for FileSchema { + type Err = ParseFileSchemaError; + + fn from_str(s: &str) -> Result { + let mut fields = Vec::new(); + let mut seen = HashSet::new(); + for item in s.split(',') { + let item = item.trim(); + if item.is_empty() { + continue; + } + let Ok(f) = item.parse::() else { + return Err(ParseFileSchemaError::Unknown(item.to_owned())); + }; + fields.push(f); + if !seen.insert(f) { + return Err(ParseFileSchemaError::Duplicate(f)); + } + } + let missing = InventoryField::REQUIRED + .into_iter() + .filter(|f| !seen.contains(f)) + .collect::>(); + if !missing.is_empty() { + return Err(ParseFileSchemaError::MissingRequired(missing)); + } + let Some(key_index) = fields.iter().position(|&f| f == InventoryField::Key) else { + unreachable!( + "Key should be present in fields after ensuring required fields are present" + ); + }; + Ok(FileSchema { fields, key_index }) + } +} + +impl<'de> Deserialize<'de> for FileSchema { + fn deserialize>(deserializer: D) -> Result { + struct Visitor; + + impl serde::de::Visitor<'_> for Visitor { + type Value = FileSchema; + + fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter.write_str("a comma-separated list of S3 Inventory list fields") + } + + fn visit_str(self, input: &str) -> Result + where + E: serde::de::Error, + { + input + .parse::() + .map_err(|e| E::invalid_value(Unexpected::Str(input), &e)) + } + } + + deserializer.deserialize_str(Visitor) + } +} + +/// Error returned by [`FileSchema::parse_csv_fields()`] on invalid input +#[derive(Clone, Debug, Eq, Error, PartialEq)] +pub(crate) enum ParseEntryError { + /// The input values lack a "key" field + #[error("inventory list entry is missing fields, including key")] + NoKey, + + /// The input values do not have the expected number of fields + #[error( + "inventory list entry for key {key:?} has {actual_len} fields; expected {expected_len}" + )] + SizeMismatch { + key: String, + expected_len: usize, + actual_len: usize, + }, + + /// The key field could not be percent-decoded + #[error("inventory list entry key {0:?} did not decode as percent-encoded UTF-8")] + InvalidKey(String), + + /// The input has an empty "bucket" field + #[error("inventory item {0:?} has empty bucket field")] + EmptyBucket(String), + + /// The input has an empty "version ID" field + #[error("inventory item {0:?} has empty version ID field")] + EmptyVersionId(String), + + /// Failed to parse an individual field + #[error("could not parse inventory list entry for key {key:?}, field {field}, field value {value:?}; expected {expected}")] + Parse { + key: String, + field: InventoryField, + value: String, + expected: &'static str, + }, + + /// The entry was not a delete marker and lacked an etag + #[error("non-deleted inventory item {0:?} lacks etag")] + NoEtag(KeyPath), + + /// The key was not an acceptable filepath + #[error("inventory item key is not an acceptable filepath")] + KeyPath(#[from] KeyPathFromStringError), +} + +/// Error retured by `FileSchema::from_str()` on invalid input +#[derive(Clone, Debug, Eq, Error, PartialEq)] +pub(crate) enum ParseFileSchemaError { + /// The list of fields contained an unknown/unrecognized field + #[error("unknown inventory field in fileSchema: {0:?}")] + Unknown(String), + + /// The list of fields contained some field more than once + #[error("duplicate inventory field in fileSchema: {0}")] + Duplicate(InventoryField), + + /// The list of fields was missing one or more fields required by s3invsync + #[error(fmt = fmt_missing)] + MissingRequired(Vec), +} + +impl serde::de::Expected for ParseFileSchemaError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "a comma-separated list of S3 Inventory list fields, but: {self}" + ) + } +} + +/// [`Display`][std::fmt::Display] formatter for the `MissingRequired` variant +/// of [`ParseFileSchemaError`] +fn fmt_missing(missing: &[InventoryField], f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "fileSchema is missing required fields: ")?; + let mut first = true; + for field in missing { + if !std::mem::replace(&mut first, false) { + write!(f, ", ")?; + } + write!(f, "{field}")?; + } + Ok(()) +} diff --git a/src/inventory/item.rs b/src/inventory/item.rs index 1f012f7..c331ecc 100644 --- a/src/inventory/item.rs +++ b/src/inventory/item.rs @@ -1,13 +1,9 @@ -use crate::keypath::{KeyPath, KeyPathFromStringError}; +use crate::keypath::KeyPath; use crate::s3::S3Location; -use serde::{de, Deserialize}; -use std::fmt; -use thiserror::Error; use time::OffsetDateTime; /// An entry in an inventory list file -#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] -#[serde(try_from = "RawInventoryEntry")] +#[derive(Clone, Debug, Eq, PartialEq)] pub(crate) enum InventoryEntry { Directory(Directory), Item(InventoryItem), @@ -17,14 +13,14 @@ pub(crate) enum InventoryEntry { #[derive(Clone, Debug, Eq, PartialEq)] pub(crate) struct Directory { /// The bucket on which the object is located - bucket: String, + pub(super) bucket: String, /// The object's key (ends in '/') // Not a KeyPath, as the key ends in '/': - key: String, + pub(super) key: String, /// The object's version ID - version_id: String, + pub(super) version_id: String, } impl Directory { @@ -52,7 +48,7 @@ pub(crate) struct InventoryItem { pub(crate) is_latest: bool, /// The object's date of last modification - pub(crate) last_modified_date: OffsetDateTime, + pub(crate) last_modified_date: Option, /// Metadata about the object's content pub(crate) details: ItemDetails, @@ -72,11 +68,11 @@ pub(crate) enum ItemDetails { /// This version of the object is not a delete marker Present { /// The object's size - size: i64, + size: Option, /// The object's etag etag: String, - /// Whether the object was uploaded as a multipart upload - is_multipart_uploaded: bool, + /// Whether the etag is an MD5 digest of the object's contents + etag_is_md5: bool, }, /// This version of the object is a delete marker @@ -92,7 +88,7 @@ impl ItemDetails { match self { ItemDetails::Present { etag, - is_multipart_uploaded: false, + etag_is_md5: true, .. } => Some(etag), _ => None, @@ -100,137 +96,16 @@ impl ItemDetails { } } -impl TryFrom for InventoryEntry { - type Error = InventoryEntryError; - - fn try_from(value: RawInventoryEntry) -> Result { - if value.key.ends_with('/') - && (value.is_delete_marker - || value.size.is_none() - || value.size.is_some_and(|sz| sz == 0)) - { - return Ok(InventoryEntry::Directory(Directory { - bucket: value.bucket, - key: value.key, - version_id: value.version_id, - })); - } - let key = KeyPath::try_from(value.key)?; - if value.is_delete_marker { - Ok(InventoryEntry::Item(InventoryItem { - bucket: value.bucket, - key, - version_id: value.version_id, - is_latest: value.is_latest, - last_modified_date: value.last_modified_date, - details: ItemDetails::Deleted, - })) - } else { - let Some(size) = value.size else { - return Err(InventoryEntryError::Size(key)); - }; - let Some(etag) = value.etag else { - return Err(InventoryEntryError::Etag(key)); - }; - let Some(is_multipart_uploaded) = value.is_multipart_uploaded else { - return Err(InventoryEntryError::Multipart(key)); - }; - Ok(InventoryEntry::Item(InventoryItem { - bucket: value.bucket, - key, - version_id: value.version_id, - is_latest: value.is_latest, - last_modified_date: value.last_modified_date, - details: ItemDetails::Present { - size, - etag, - is_multipart_uploaded, - }, - })) - } - } -} - -/// Error returned when parsing an inventory entry fails -#[derive(Clone, Debug, Eq, Error, PartialEq)] -pub(crate) enum InventoryEntryError { - /// The entry was not a delete marker and lacked a size - #[error("non-deleted inventory item {0:?} lacks size")] - Size(KeyPath), - - /// The entry was not a delete marker and lacked an etag - #[error("non-deleted inventory item {0:?} lacks etag")] - Etag(KeyPath), - - /// The entry was not a delete marker and lacked an is-multipart-uploaded - /// field - #[error("non-deleted inventory item {0:?} lacks is-multipart-uploaded field")] - Multipart(KeyPath), - - /// The key was not an acceptable filepath - // Serde (CSV?) errors don't show sources, so we need to include them - // manually: - #[error("inventory item key is not an acceptable filepath: {0}")] - KeyPath(#[from] KeyPathFromStringError), -} - -/// An entry directly parsed from an inventory list -#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] -struct RawInventoryEntry { - // IMPORTANT: The order of the fields must match that in - // `EXPECTED_FILE_SCHEMA` in `manifest.rs` - bucket: String, - #[serde(deserialize_with = "percent_decode")] - key: String, - version_id: String, - is_latest: bool, - is_delete_marker: bool, - size: Option, - #[serde(with = "time::serde::rfc3339")] - last_modified_date: OffsetDateTime, - etag: Option, - is_multipart_uploaded: Option, -} - -/// Deserialize a percent-encoded string -fn percent_decode<'de, D>(deserializer: D) -> Result -where - D: de::Deserializer<'de>, -{ - struct Visitor; - - impl de::Visitor<'_> for Visitor { - type Value = String; - - fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { - formatter.write_str("a percent-encoded UTF-8 string") - } - - fn visit_str(self, input: &str) -> Result - where - E: de::Error, - { - percent_encoding::percent_decode_str(input) - .decode_utf8() - .map(std::borrow::Cow::into_owned) - .map_err(|_| E::invalid_value(de::Unexpected::Str(input), &self)) - } - } - - deserializer.deserialize_str(Visitor) -} - #[cfg(test)] mod tests { use super::*; + use crate::inventory::{CsvReader, FileSchema}; use assert_matches::assert_matches; use time::macros::datetime; fn parse_csv(s: &str) -> InventoryEntry { - csv::ReaderBuilder::new() - .has_headers(false) - .from_reader(std::io::Cursor::new(s)) - .deserialize() + let file_schema = "Bucket, Key, VersionId, IsLatest, IsDeleteMarker, Size, LastModifiedDate, ETag, IsMultipartUploaded".parse::().unwrap(); + CsvReader::new(s.as_bytes(), file_schema) .next() .unwrap() .unwrap() @@ -249,13 +124,13 @@ mod tests { ); assert_eq!(item.version_id, "nuYD8l5blCvLV3DbAiN1IXuwo7aF3F98"); assert!(item.is_latest); - assert_eq!(item.last_modified_date, datetime!(2022-12-12 13:20:39 UTC)); + assert_eq!(item.last_modified_date, Some(datetime!(2022-12-12 13:20:39 UTC))); assert_eq!( item.details, ItemDetails::Present { - size: 1511723, + size: Some(1511723), etag: "627c47efe292876b91978324485cd2ec".into(), - is_multipart_uploaded: false + etag_is_md5: true, } ); }); @@ -274,7 +149,7 @@ mod tests { ); assert_eq!(item.version_id, "t5w9XO56_Yi1eF6HE7KUgoLumufisMyo"); assert!(!item.is_latest); - assert_eq!(item.last_modified_date, datetime!(2022-12-11 17:55:08 UTC)); + assert_eq!(item.last_modified_date, Some(datetime!(2022-12-11 17:55:08 UTC))); assert_eq!(item.details, ItemDetails::Deleted); }); } @@ -292,13 +167,13 @@ mod tests { ); assert_eq!(item.version_id, "t4Z7oFATOK2678GfaU8oLcjWDMAS0RgK"); assert!(item.is_latest); - assert_eq!(item.last_modified_date, datetime!(2024-05-07 21:12:55 UTC)); + assert_eq!(item.last_modified_date, Some(datetime!(2024-05-07 21:12:55 UTC))); assert_eq!( item.details, ItemDetails::Present { - size: 38129, + size: Some(38129), etag: "f58c1f0e5fb20a9152788f825375884a".into(), - is_multipart_uploaded: false, + etag_is_md5: true, } ); }); diff --git a/src/inventory/list.rs b/src/inventory/list.rs index 4c4c82b..814f9eb 100644 --- a/src/inventory/list.rs +++ b/src/inventory/list.rs @@ -1,8 +1,9 @@ +use super::fields::{FileSchema, ParseEntryError}; use super::item::InventoryEntry; use crate::s3::S3Location; use flate2::bufread::GzDecoder; use std::fs::File; -use std::io::BufReader; +use std::io::{BufRead, BufReader, Read}; use std::path::PathBuf; use thiserror::Error; @@ -16,21 +17,18 @@ pub(crate) struct InventoryList { url: S3Location, /// The inner filehandle - inner: csv::DeserializeRecordsIntoIter>, InventoryEntry>, + inner: CsvReader>>, } impl InventoryList { - /// Construct an `InventoryList` from a local file handle `f` at path - /// `path`, downloaded from `url` - pub(crate) fn from_gzip_csv_file(path: PathBuf, url: S3Location, f: File) -> InventoryList { - InventoryList { - path, - url, - inner: csv::ReaderBuilder::new() - .has_headers(false) - .from_reader(GzDecoder::new(BufReader::new(f))) - .into_deserialize(), - } + /// Construct an `InventoryList` from a `CsvReader` reading from the file + /// at path `path` that was downloaded from `url` + pub(crate) fn for_downloaded_csv( + path: PathBuf, + url: S3Location, + inner: CsvReader>>, + ) -> InventoryList { + InventoryList { path, url, inner } } } @@ -58,5 +56,51 @@ impl Drop for InventoryList { #[error("failed to read entry from inventory list at {url}")] pub(crate) struct InventoryListError { url: S3Location, - source: csv::Error, + source: CsvReaderError, +} + +/// A struct for decoding [`InventoryEntry`]s from a reader containing CSV data +pub(crate) struct CsvReader { + inner: csv::DeserializeRecordsIntoIter>, + file_schema: FileSchema, +} + +impl CsvReader { + pub(crate) fn new(r: R, file_schema: FileSchema) -> Self { + CsvReader { + inner: csv::ReaderBuilder::new() + .has_headers(false) + .from_reader(r) + .into_deserialize(), + file_schema, + } + } +} + +impl CsvReader> { + pub(crate) fn from_gzipped_reader(r: R, file_schema: FileSchema) -> Self { + CsvReader::new(GzDecoder::new(r), file_schema) + } +} + +impl Iterator for CsvReader { + type Item = Result; + + fn next(&mut self) -> Option { + match self.inner.next()? { + Ok(values) => match self.file_schema.parse_csv_fields(values) { + Ok(entry) => Some(Ok(entry)), + Err(e) => Some(Err(e.into())), + }, + Err(e) => Some(Err(e.into())), + } + } +} + +#[derive(Debug, Error)] +pub(crate) enum CsvReaderError { + #[error("failed to read entry from CSV file")] + Csv(#[from] csv::Error), + #[error("failed to parse fields of CSV entry")] + Parse(#[from] ParseEntryError), } diff --git a/src/inventory/mod.rs b/src/inventory/mod.rs index 0e8992b..9c273a2 100644 --- a/src/inventory/mod.rs +++ b/src/inventory/mod.rs @@ -1,5 +1,7 @@ //! Inventory list files and their entries +mod fields; mod item; mod list; +pub(crate) use self::fields::*; pub(crate) use self::item::*; pub(crate) use self::list::*; diff --git a/src/manifest.rs b/src/manifest.rs index 0c4cd62..2fec5e7 100644 --- a/src/manifest.rs +++ b/src/manifest.rs @@ -1,26 +1,32 @@ +use crate::inventory::FileSchema; use serde::Deserialize; use thiserror::Error; -/// Currently, only manifests with this exact fileSchema value are supported. -static EXPECTED_FILE_SCHEMA: &str = "Bucket, Key, VersionId, IsLatest, IsDeleteMarker, Size, LastModifiedDate, ETag, IsMultipartUploaded"; - /// A listing of CSV inventory files from a manifest #[derive(Clone, Debug, Deserialize, Eq, PartialEq)] -#[serde(try_from = "Manifest")] +#[serde(try_from = "RawManifest")] pub(crate) struct CsvManifest { pub(crate) files: Vec, } -impl TryFrom for CsvManifest { +impl TryFrom for CsvManifest { type Error = ManifestError; - fn try_from(value: Manifest) -> Result { + fn try_from(value: RawManifest) -> Result { if value.file_format != FileFormat::Csv { Err(ManifestError::Format(value.file_format)) - } else if value.file_schema != EXPECTED_FILE_SCHEMA { - Err(ManifestError::Schema(value.file_schema)) } else { - Ok(CsvManifest { files: value.files }) + let files = value + .files + .into_iter() + .map(|spec| FileSpec { + key: spec.key, + size: spec.size, + md5_checksum: spec.md5_checksum, + file_schema: value.file_schema.clone(), + }) + .collect(); + Ok(CsvManifest { files }) } } } @@ -32,23 +38,19 @@ pub(crate) enum ManifestError { /// CSV #[error("inventory files are in {0:?} format; only CSV is supported")] Format(FileFormat), - - /// Returned when a manifest's fileSchema is not the supported value - #[error("inventory schema is unsupported {0:?}; expected {EXPECTED_FILE_SCHEMA:?}")] - Schema(String), } /// Parsed `manifest.json` file #[derive(Clone, Debug, Deserialize, Eq, PartialEq)] #[serde(rename_all = "camelCase")] -struct Manifest { +struct RawManifest { //source_bucket: String, //destination_bucket: String, //version: String, //creation_timestamp: String, file_format: FileFormat, - file_schema: String, - files: Vec, + file_schema: FileSchema, + files: Vec, } /// The possible inventory list file formats @@ -63,7 +65,7 @@ pub(crate) enum FileFormat { } /// An entry in a manifest's "files" list pointing to an inventory list file -#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq)] pub(crate) struct FileSpec { /// S3 object key of the inventory list file pub(crate) key: String, @@ -71,6 +73,23 @@ pub(crate) struct FileSpec { /// Size of the inventory list file pub(crate) size: i64, + /// MD5 digest of the inventory list file + pub(crate) md5_checksum: String, + + /// The fields used by the inventory list file + pub(crate) file_schema: FileSchema, +} + +/// An entry in a manifest's "files" list pointing to an inventory list file, +/// as deserialized directly from a manifest +#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] +pub(crate) struct RawFileSpec { + /// S3 object key of the inventory list file + pub(crate) key: String, + + /// Size of the inventory list file + pub(crate) size: i64, + /// MD5 digest of the inventory list file #[serde(rename = "MD5checksum")] pub(crate) md5_checksum: String, diff --git a/src/s3/mod.rs b/src/s3/mod.rs index 59c3646..a273216 100644 --- a/src/s3/mod.rs +++ b/src/s3/mod.rs @@ -3,7 +3,7 @@ mod location; mod streams; pub(crate) use self::location::S3Location; use self::streams::{ListManifestDates, ListObjectsError}; -use crate::inventory::InventoryList; +use crate::inventory::{CsvReader, InventoryList}; use crate::manifest::{CsvManifest, FileSpec}; use crate::timestamps::{Date, DateHM, DateMaybeHM}; use aws_credential_types::{ @@ -231,7 +231,8 @@ impl S3Client { url: url.clone(), source, })?; - Ok(InventoryList::from_gzip_csv_file(path, url, outfile)) + let reader = CsvReader::from_gzipped_reader(BufReader::new(outfile), fspec.file_schema); + Ok(InventoryList::for_downloaded_csv(path, url, reader)) } /// Download the object at `url` and write its bytes to `outfile`. If diff --git a/src/syncer.rs b/src/syncer.rs index 504e415..834913a 100644 --- a/src/syncer.rs +++ b/src/syncer.rs @@ -380,8 +380,10 @@ impl Syncer { path.display() ) })?; - fp.set_modified(item.last_modified_date.into()) - .with_context(|| format!("failed to set mtime on {}", path.display()))?; + if let Some(mtime) = item.last_modified_date { + fp.set_modified(mtime.into()) + .with_context(|| format!("failed to set mtime on {}", path.display()))?; + } Ok(true) } Some(Err(e)) => {