Skip to content

Commit

Permalink
Add num_replicas parameter to specify the number of stream replicas w…
Browse files Browse the repository at this point in the history
…hen in clustered mode

Signed-off-by: Marco Amador <amador.marco@gmail.com>
  • Loading branch information
mfamador committed Oct 4, 2024
1 parent 2d4241d commit 195c6aa
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 3 deletions.
12 changes: 11 additions & 1 deletion docs/modules/components/pages/inputs/nats_jetstream.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ input:
ack_wait: 30s
max_ack_pending: 1024
create_if_not_exists: false
num_replicas: 1
storage_type: memory
nak_delay: 1m # No default (optional)
nak_delay_until_header: nak_delay_until
Expand Down Expand Up @@ -280,9 +281,18 @@ Create the `stream` and `subject` if do not exist.
*Default*: `false`
=== `num_replicas`
The number of stream replicas, only supported in clustered mode and if the stream is created when `create_if_not_exists` is set to true. Defaults to 1, maximum is 5.
*Type*: `int`
*Default*: `1`
=== `storage_type`
Storage type to use when the stream does not exist and is created when `create_if_not_exists` is set to true. Can be `memory` or `file` storage
Storage type to use when the stream does not exist and is created when `create_if_not_exists` is set to true. Can be `memory` or `file` storage.
*Type*: `string`
Expand Down
19 changes: 17 additions & 2 deletions internal/impl/nats/input_jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,12 @@ xref:configuration:interpolation.adoc#bloblang-queries[function interpolation].
Description("Create the `stream` and `subject` if do not exist.").
Advanced().
Default(false)).
Field(service.NewIntField("num_replicas").
Description("The number of stream replicas, only supported in clustered mode and if the stream is created when `create_if_not_exists` is set to true. Defaults to 1, maximum is 5.").
Advanced().
Default(1)).
Field(service.NewStringEnumField("storage_type", "memory", "file").
Description("Storage type to use when the stream does not exist and is created when `create_if_not_exists` is set to true. Can be `memory` or `file` storage").
Description("Storage type to use when the stream does not exist and is created when `create_if_not_exists` is set to true. Can be `memory` or `file` storage.").
Advanced().
Default("memory")).
Field(service.NewDurationField("nak_delay").
Expand Down Expand Up @@ -149,6 +153,7 @@ type jetStreamReader struct {
nakDelayUntilHeader string
maxAckPending int
createIfNotExists bool
numReplicas int
storageType string

log *service.Logger
Expand Down Expand Up @@ -243,6 +248,14 @@ func newJetStreamReaderFromConfig(conf *service.ParsedConfig, mgr *service.Resou
return nil, err
}
}
if conf.Contains("num_replicas") {
if j.numReplicas, err = conf.FieldInt("num_replicas"); err != nil {
return nil, err
}
if j.numReplicas < 1 || j.numReplicas > 5 {
return nil, fmt.Errorf("num_replicas %d is invalid, it must be between 1 and 5", j.numReplicas)
}
}
if conf.Contains("storage_type") {
if j.storageType, err = conf.FieldString("storage_type"); err != nil {
return nil, err
Expand Down Expand Up @@ -352,6 +365,7 @@ func (j *jetStreamReader) Connect(ctx context.Context) (err error) {
var natsErr *nats.APIError
if errors.As(err, &natsErr) {
if natsErr.ErrorCode == nats.JSErrCodeStreamNotFound {
// create stream and subject
_, err = jCtx.AddStream(&nats.StreamConfig{
Name: j.stream,
Subjects: func() []string {
Expand All @@ -366,10 +380,11 @@ func (j *jetStreamReader) Connect(ctx context.Context) (err error) {
}
return nats.MemoryStorage
}(),
Replicas: j.numReplicas,
})
}
} else if strings.Contains(err.Error(), "does not match consumer") {
// create subject to existent stream .stream
// create subject on existent stream
_, err = jCtx.UpdateStream(&nats.StreamConfig{
Name: j.stream,
Subjects: func() []string {
Expand Down

0 comments on commit 195c6aa

Please sign in to comment.