Skip to content

Commit

Permalink
Add an opt-in parameter to create the stream and subject if not exist
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Amador <amador.marco@gmail.com>
  • Loading branch information
mfamador committed May 24, 2024
1 parent ee570c0 commit 955071f
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 0 deletions.
37 changes: 37 additions & 0 deletions internal/impl/nats/input_jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ You can access these metadata fields using
Description("The maximum number of outstanding acks to be allowed before consuming is halted.").
Advanced().
Default(1024)).
Field(service.NewBoolField("create_if_not_exists").
Description("Create the `stream` and `subject` if do not exist.").
Advanced().
Default(false)).
Field(service.NewStringEnumField("storage_type", "memory", "file").
Description("Storage type to use when the stream does not exist and is created when `create_if_not_exists` is set to true. Can be `memory` or `file` storage").
Advanced().
Default("memory")).
Field(service.NewDurationField("nak_delay").
Description("An optional delay duration on redelivering the messages when negatively acknowledged.").
Example("1m").
Expand Down Expand Up @@ -122,6 +130,8 @@ type jetStreamReader struct {
nakDelay time.Duration
nakDelayUntilHeader string
maxAckPending int
createIfNotExists bool
storageType string

log *service.Logger

Expand Down Expand Up @@ -207,6 +217,17 @@ func newJetStreamReaderFromConfig(conf *service.ParsedConfig, mgr *service.Resou
}
}

if conf.Contains("create_if_not_exists") {
if j.createIfNotExists, err = conf.FieldBool("create_if_not_exists"); err != nil {
return nil, err
}
}
if conf.Contains("storage_type") {
if j.storageType, err = conf.FieldString("storage_type"); err != nil {
return nil, err
}
}

if conf.Contains("nak_delay") {
if j.nakDelay, err = conf.FieldDuration("nak_delay"); err != nil {
return nil, err
Expand Down Expand Up @@ -304,7 +325,23 @@ func (j *jetStreamReader) Connect(ctx context.Context) (err error) {
natsSub, err = jCtx.QueueSubscribeSync(j.subject, j.queue, options...)
}
}

if err != nil {
var natsErr *nats.APIError
if j.createIfNotExists && errors.As(err, &natsErr) {
if natsErr.ErrorCode == nats.JSErrCodeStreamNotFound {
_, err = jCtx.AddStream(&nats.StreamConfig{
Name: j.stream,
Subjects: []string{j.subject},
Storage: func() nats.StorageType {
if j.storageType == "file" {
return nats.FileStorage
}
return nats.MemoryStorage
}(),
})
}
}
return err
}

Expand Down
19 changes: 19 additions & 0 deletions website/docs/components/inputs/nats_jetstream.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ input:
deliver: all
ack_wait: 30s
max_ack_pending: 1024
create_if_not_exists: false
storage_type: memory
nak_delay: 1m # No default (optional)
nak_delay_until_header: nak_delay_until
tls:
Expand Down Expand Up @@ -243,6 +245,23 @@ The maximum number of outstanding acks to be allowed before consuming is halted.
Type: `int`
Default: `1024`

### `create_if_not_exists`

Create the `stream` and `subject` if do not exist.


Type: `bool`
Default: `false`

### `storage_type`

Storage type to use when the stream does not exist and is created when `create_if_not_exists` is set to true. Can be `memory` or `file` storage


Type: `string`
Default: `"memory"`
Options: `memory`, `file`.

### `nak_delay`

An optional delay duration on redelivering the messages when negatively acknowledged.
Expand Down

0 comments on commit 955071f

Please sign in to comment.