Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(aws_s3 source): Logs processed S3 objects #22083

Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
`aws_s3` source now logs when S3 objects are fetched. If ACKs are enabled, it also logs on delivery.

authors: fdamstra
45 changes: 41 additions & 4 deletions src/sources/aws_s3/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,11 @@ pub enum ProcessingError {
#[snafu(display("Unsupported S3 event version: {}.", version,))]
UnsupportedS3EventVersion { version: semver::Version },
#[snafu(display("Sink reported an error sending events"))]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks much better now. You can also use these fields in #[snafu(display... like UnsupportedS3EventVersion does above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much. I've incorporated these into the snafu string.

I appreciate all the handholding you've been providing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your contribution!
If all checks pass, this will be added to the merge queue.

ErrorAcknowledgement,
ErrorAcknowledgement {
region: String,
bucket: String,
key: String,
},
}

pub struct State {
Expand Down Expand Up @@ -393,6 +397,11 @@ impl IngestorProcess {
message_id: &message_id
});
if self.state.delete_message {
trace!(
message = "Queued SQS message for deletion",
id = message_id,
receipt_handle = receipt_handle,
);
delete_entries.push(
DeleteMessageBatchRequestEntry::builder()
.id(message_id)
Expand Down Expand Up @@ -517,6 +526,12 @@ impl IngestorProcess {

let object = object_result?;

info!(
message = "Got S3 object from SQS notification.",
bucket = s3_event.s3.bucket.name,
key = s3_event.s3.object.key
);

let metadata = object.metadata;

let timestamp = object.last_modified.map(|ts| {
Expand Down Expand Up @@ -641,13 +656,35 @@ impl IngestorProcess {
Some(receiver) => {
let result = receiver.await;
match result {
BatchStatus::Delivered => Ok(()),
BatchStatus::Errored => Err(ProcessingError::ErrorAcknowledgement),
BatchStatus::Delivered => {
info!(
message = "S3 object from SQS delivered.",
bucket = s3_event.s3.bucket.name,
key = s3_event.s3.object.key,
);
Ok(())
}
BatchStatus::Errored => {
Err(ProcessingError::ErrorAcknowledgement {
bucket: s3_event.s3.bucket.name,
key: s3_event.s3.object.key,
region: s3_event.aws_region,
})
},
BatchStatus::Rejected => {
if self.state.delete_failed_message {
warn!(
message = "S3 object from SQS was rejected. Deleting failed message.",
bucket = s3_event.s3.bucket.name,
key = s3_event.s3.object.key,
);
Ok(())
} else {
Err(ProcessingError::ErrorAcknowledgement)
Err(ProcessingError::ErrorAcknowledgement {
bucket: s3_event.s3.bucket.name,
key: s3_event.s3.object.key,
region: s3_event.aws_region,
})
}
}
}
Expand Down