Skip to content

Commit 1275f1a

Browse files
fix(kafka sink): retry messages that result in kafka policy violations (vectordotdev#22041)
* fix(kafka sink): retry messages that result in kafka policy violations Problem: Some messages were getting dropped by Vector due to Kafka throwing `PolicyViolation` errors. These should be retried as a policy can be as simple as a more aggressive rate limit. ---------- Solution: Retry any messages that had the `RDKafkaErrorCode::PolicyViolation` error. ---------- Note: A dynamic back off may be better, as there may be a rate limit out there that still needs more than 100ms to back off on requests. ---------- See the original issue at vectordotdev#22026 Closes vectordotdev#22026 * Update changelog.d/22026-retry-kafka-policy-violations.fix.md --------- Co-authored-by: Jesse Szwedko <jesse@szwedko.me>
1 parent 656136a commit 1275f1a

File tree

2 files changed

+8
-2
lines changed

2 files changed

+8
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Retry Kafka messages that error with a policy violation so messages are not lost.
2+
3+
authors: PriceHiller

src/sinks/kafka/service.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,12 @@ impl Service<KafkaRequest> for KafkaService {
159159
})
160160
.map_err(|(err, _)| err);
161161
}
162-
// Producer queue is full.
162+
// Producer queue is full or a policy has been violated and the request should
163+
// be retried
163164
Err((
164-
KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull),
165+
KafkaError::MessageProduction(
166+
RDKafkaErrorCode::QueueFull | RDKafkaErrorCode::PolicyViolation,
167+
),
165168
original_record,
166169
)) => {
167170
if blocked_state.is_none() {

0 commit comments

Comments
 (0)