Skip to content

Commit

Permalink
enhancement(aws_s3 source): Logs processed S3 objects (#22083)
Browse files Browse the repository at this point in the history
* `aws_s3` source now logs the S3 objects that were processed

Unlike the `aws_sqs` source type, the sqs messaage itself is not the
source of events. This logs the bucket and key for files that were
ingested via vector in order to have a better audit log.

If acknowledgements are enabled, it also logs when they are
successfully processed.

* Adds changelog note

* Fixes naming of aws_s3 logging changelog entry

* Rephrased changelog

* Rephrased changelog

* Removes redundant error messages; Adds useful information to acknowledgement errors

* Rephrased to make it clearer what was delivered

* Outputs bucket and s3 during errors

* Added missing '.'
  • Loading branch information
fdamstra authored Jan 10, 2025
1 parent 68ee4f2 commit 1ef01ae
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 5 deletions.
3 changes: 3 additions & 0 deletions changelog.d/21128_log_aws_s3_objects_consumed.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
`aws_s3` source now logs when S3 objects are fetched. If ACKs are enabled, it also logs on delivery.

authors: fdamstra
51 changes: 46 additions & 5 deletions src/sources/aws_s3/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,17 @@ pub enum ProcessingError {
},
#[snafu(display("Unsupported S3 event version: {}.", version,))]
UnsupportedS3EventVersion { version: semver::Version },
#[snafu(display("Sink reported an error sending events"))]
ErrorAcknowledgement,
#[snafu(display(
"Sink reported an error sending events for an s3 object in region {}: s3://{}/{}",
region,
bucket,
key
))]
ErrorAcknowledgement {
region: String,
bucket: String,
key: String,
},
}

pub struct State {
Expand Down Expand Up @@ -393,6 +402,11 @@ impl IngestorProcess {
message_id: &message_id
});
if self.state.delete_message {
trace!(
message = "Queued SQS message for deletion.",
id = message_id,
receipt_handle = receipt_handle,
);
delete_entries.push(
DeleteMessageBatchRequestEntry::builder()
.id(message_id)
Expand Down Expand Up @@ -517,6 +531,12 @@ impl IngestorProcess {

let object = object_result?;

info!(
message = "Got S3 object from SQS notification.",
bucket = s3_event.s3.bucket.name,
key = s3_event.s3.object.key
);

let metadata = object.metadata;

let timestamp = object.last_modified.map(|ts| {
Expand Down Expand Up @@ -641,13 +661,34 @@ impl IngestorProcess {
Some(receiver) => {
let result = receiver.await;
match result {
BatchStatus::Delivered => Ok(()),
BatchStatus::Errored => Err(ProcessingError::ErrorAcknowledgement),
BatchStatus::Delivered => {
info!(
message = "S3 object from SQS delivered.",
bucket = s3_event.s3.bucket.name,
key = s3_event.s3.object.key,
);
Ok(())
}
BatchStatus::Errored => Err(ProcessingError::ErrorAcknowledgement {
bucket: s3_event.s3.bucket.name,
key: s3_event.s3.object.key,
region: s3_event.aws_region,
}),
BatchStatus::Rejected => {
if self.state.delete_failed_message {
warn!(
message =
"S3 object from SQS was rejected. Deleting failed message.",
bucket = s3_event.s3.bucket.name,
key = s3_event.s3.object.key,
);
Ok(())
} else {
Err(ProcessingError::ErrorAcknowledgement)
Err(ProcessingError::ErrorAcknowledgement {
bucket: s3_event.s3.bucket.name,
key: s3_event.s3.object.key,
region: s3_event.aws_region,
})
}
}
}
Expand Down

0 comments on commit 1ef01ae

Please sign in to comment.