Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support NAK delay on nats_jetstream input #2556

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

mfamador
Copy link
Contributor

@mfamador mfamador commented Apr 30, 2024

Add an optional parameter to make NATS Jetstream to delay the redelivery of all msgs when negatively acknowledged. It can be used as a retry mechanism when a transient error occurs.

input:
  nats_jetstream:
    urls:
      - localhost:4222
    subject: foo
    durable: bar
    nak_delay: 5m

pipeline:
  processors:
  -  . . . 

output:
  switch:
    cases:
    - check: 'errored()'
      output:
        reject: "rejecting due to processing error: ${! error() }"
    - output:
        resource: output_ok

Additionally, we can use a header on each msg to define the unix epoch timestamp until when we want to delay the msg processing. By default is nak_delay_until but we can set it with another name using the parameter nak_delay_until_header

input:
  nats_jetstream:
    subject: foo
    durable: bar

pipeline:
  processors:
  - mapping: throw("transient error")
    
output:
  switch:
    cases:
    - check: '@nak_delay_until.number() - timestamp_unix() > 0'
      output:
        reject: "not time to process it yet ${! @time_to_process }"
              
    - check: 'errored()'
      output:
        nats_jetstream:
          subject: foo
          headers:
            nak_delay_until: ${! (timestamp_unix() + @num_retries * 10).int64() }
            num_retries: ${! @num_retries }
        processors:
        - mapping: meta num_retries = @num_retries.number().or(0) + 1
            
    - output:
        drop: { } # our regular output
        processors:

It was also added an opt-in parameter to create the stream and subject if they not exist and another to specify the number of stream replicas when in clustered mode.

@mfamador mfamador requested a review from Jeffail as a code owner April 30, 2024 12:59
@mfamador mfamador marked this pull request as draft April 30, 2024 14:20
@mfamador mfamador changed the title Support a nak_delay on NATS Jetstream input Support a general nak_delay for all msgs and a header to set the nak_delay individually for each msg on NATS Jetstream input Apr 30, 2024
@mfamador mfamador force-pushed the nats-jetstream-nak-delay branch 2 times, most recently from c648244 to 10320aa Compare April 30, 2024 15:31
@mfamador mfamador marked this pull request as ready for review April 30, 2024 15:37
@mfamador mfamador force-pushed the nats-jetstream-nak-delay branch from 4c91ff2 to ee570c0 Compare May 23, 2024 18:34
@mfamador mfamador changed the title Support a general nak_delay for all msgs and a header to set the nak_delay individually for each msg on NATS Jetstream input Support a NAK delay May 23, 2024
@mfamador mfamador force-pushed the nats-jetstream-nak-delay branch 2 times, most recently from d9c4ee7 to 3678abe Compare May 23, 2024 20:15
@mfamador mfamador changed the title Support a NAK delay Support a NAK delay to nats_jetstream input May 23, 2024
@mfamador mfamador changed the title Support a NAK delay to nats_jetstream input Support NAK delay on nats_jetstream input May 23, 2024
@mfamador mfamador force-pushed the nats-jetstream-nak-delay branch 2 times, most recently from 955071f to f6875ac Compare May 29, 2024 11:11
@mfamador mfamador force-pushed the nats-jetstream-nak-delay branch 4 times, most recently from 033c28c to acf260b Compare June 4, 2024 12:20
@mfamador mfamador force-pushed the nats-jetstream-nak-delay branch from acf260b to 2d4241d Compare October 4, 2024 14:18
Signed-off-by: Marco Amador <amador.marco@gmail.com>
…hen in clustered mode

Signed-off-by: Marco Amador <amador.marco@gmail.com>
@mfamador mfamador force-pushed the nats-jetstream-nak-delay branch from 195c6aa to 723a423 Compare January 10, 2025 21:43
@mfamador
Copy link
Contributor Author

mfamador commented Jan 11, 2025

Removed the part where we try to update the stream if we change the subject. This way it has the same behaviour as without the create_if_not_exists flag on which it fails if we don't change the durable/consumer as well. To be able to read to all subjects the stream is created with subjects "*"

@mfamador mfamador force-pushed the nats-jetstream-nak-delay branch from cdec89a to d947241 Compare January 11, 2025 14:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant