Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(aws_s3 source): Logs processed S3 objects #22083

Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
`aws_s3` source now logs when S3 objects are fetched. If ACKs are enabled, it also logs on delivery.

authors: fdamstra
51 changes: 46 additions & 5 deletions src/sources/aws_s3/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,17 @@ pub enum ProcessingError {
},
#[snafu(display("Unsupported S3 event version: {}.", version,))]
UnsupportedS3EventVersion { version: semver::Version },
#[snafu(display("Sink reported an error sending events"))]
ErrorAcknowledgement,
#[snafu(display(
"Sink reported an error sending events for an s3 object in region {}: s3://{}/{}",
region,
bucket,
key
))]
ErrorAcknowledgement {
region: String,
bucket: String,
key: String,
},
}

pub struct State {
Expand Down Expand Up @@ -393,6 +402,11 @@ impl IngestorProcess {
message_id: &message_id
});
if self.state.delete_message {
trace!(
message = "Queued SQS message for deletion.",
id = message_id,
receipt_handle = receipt_handle,
);
delete_entries.push(
DeleteMessageBatchRequestEntry::builder()
.id(message_id)
Expand Down Expand Up @@ -517,6 +531,12 @@ impl IngestorProcess {

let object = object_result?;

info!(
message = "Got S3 object from SQS notification.",
bucket = s3_event.s3.bucket.name,
key = s3_event.s3.object.key
);

let metadata = object.metadata;

let timestamp = object.last_modified.map(|ts| {
Expand Down Expand Up @@ -641,13 +661,34 @@ impl IngestorProcess {
Some(receiver) => {
let result = receiver.await;
match result {
BatchStatus::Delivered => Ok(()),
BatchStatus::Errored => Err(ProcessingError::ErrorAcknowledgement),
BatchStatus::Delivered => {
info!(
message = "S3 object from SQS delivered.",
bucket = s3_event.s3.bucket.name,
key = s3_event.s3.object.key,
);
Ok(())
}
BatchStatus::Errored => Err(ProcessingError::ErrorAcknowledgement {
bucket: s3_event.s3.bucket.name,
key: s3_event.s3.object.key,
region: s3_event.aws_region,
}),
BatchStatus::Rejected => {
if self.state.delete_failed_message {
warn!(
message =
"S3 object from SQS was rejected. Deleting failed message.",
bucket = s3_event.s3.bucket.name,
key = s3_event.s3.object.key,
);
Ok(())
} else {
Err(ProcessingError::ErrorAcknowledgement)
Err(ProcessingError::ErrorAcknowledgement {
bucket: s3_event.s3.bucket.name,
key: s3_event.s3.object.key,
region: s3_event.aws_region,
})
}
}
}
Expand Down
Loading