Skip to content

Commit

Permalink
chore: do not try to update stream to have the same behaviour as with…
Browse files Browse the repository at this point in the history
…out create_if_not_exists flag.
  • Loading branch information
mfamador committed Jan 11, 2025
1 parent 723a423 commit cdec89a
Showing 1 changed file with 12 additions and 18 deletions.
30 changes: 12 additions & 18 deletions internal/impl/nats/input_jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -383,23 +382,6 @@ func (j *jetStreamReader) Connect(ctx context.Context) (err error) {
Replicas: j.numReplicas,
})
}
} else if strings.Contains(err.Error(), "does not match consumer") {
// create subject on existent stream
_, err = jCtx.UpdateStream(&nats.StreamConfig{
Name: j.stream,
Subjects: func() []string {
if j.subject == "" {
return nil
}
return []string{j.subject}
}(),
Storage: func() nats.StorageType {
if j.storageType == "file" {
return nats.FileStorage
}
return nats.MemoryStorage
}(),
})
}
}
return err
Expand All @@ -410,6 +392,18 @@ func (j *jetStreamReader) Connect(ctx context.Context) (err error) {
return nil
}

func unique(input []string) []string {

Check failure on line 395 in internal/impl/nats/input_jetstream.go

View workflow job for this annotation

GitHub Actions / golangci-lint

func `unique` is unused (unused)
uniqueMap := make(map[string]struct{})
var result []string
for _, item := range input {
if _, exists := uniqueMap[item]; !exists {
uniqueMap[item] = struct{}{}
result = append(result, item)
}
}
return result
}

func (j *jetStreamReader) disconnect() {
j.connMut.Lock()
defer j.connMut.Unlock()
Expand Down

0 comments on commit cdec89a

Please sign in to comment.