Skip to content

Commit

Permalink
Remove onConnect and onClose hooks from FranzReaderOrdered
Browse files Browse the repository at this point in the history
Signed-off-by: Mihai Todor <todormihai@gmail.com>
  • Loading branch information
mihaitodor committed Jan 13, 2025
1 parent 13cb279 commit 7888a52
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 47 deletions.
2 changes: 1 addition & 1 deletion internal/impl/kafka/enterprise/redpanda_common_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func init() {
time.Sleep(time.Millisecond * 100)
}
return
}, nil, nil, nil)
}, nil)
if err != nil {
return nil, err
}
Expand Down
44 changes: 33 additions & 11 deletions internal/impl/kafka/enterprise/redpanda_migrator_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,26 +156,48 @@ func init() {
}

return kafka.FranzRecordToMessageV1(record), true
},
func(ctx context.Context, res *service.Resources, client *kgo.Client) {
if err = kafka.FranzSharedClientSet(clientLabel, &kafka.FranzSharedClientInfo{
Client: client,
}, res); err != nil {
res.Logger().With("error", err).Warn("Failed to store client connection for sharing")
}
},
func(res *service.Resources) {
_, _ = kafka.FranzSharedClientPop(clientLabel, res)
})
if err != nil {
return nil, err
}

return service.AutoRetryNacksBatchedToggled(conf, rdr)
return service.AutoRetryNacksBatchedToggled(conf, &redpandaMigratorInput{
FranzReaderOrdered: rdr,
clientLabel: clientLabel,
mgr: mgr,
})
})
if err != nil {
panic(err)
}
}

//------------------------------------------------------------------------------

type redpandaMigratorInput struct {
*kafka.FranzReaderOrdered

clientLabel string

mgr *service.Resources
}

func (rmi *redpandaMigratorInput) Connect(ctx context.Context) error {
if err := rmi.FranzReaderOrdered.Connect(ctx); err != nil {
return err
}

if err := kafka.FranzSharedClientSet(rmi.clientLabel, &kafka.FranzSharedClientInfo{
Client: rmi.FranzReaderOrdered.Client,
}, rmi.mgr); err != nil {
rmi.mgr.Logger().Warnf("Failed to store client connection for sharing: %s", err)
}

return nil
}

func (rmi *redpandaMigratorInput) Close(ctx context.Context) error {
_, _ = kafka.FranzSharedClientPop(rmi.clientLabel, rmi.mgr)

return rmi.FranzReaderOrdered.Close(ctx)
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func init() {
msg.MetaSetMut("kafka_offset_metadata", offsetCommitValue.Metadata)

return msg, true
}, nil, nil)
})
if err != nil {
return nil, err
}
Expand Down
47 changes: 14 additions & 33 deletions internal/impl/kafka/franz_reader_ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,6 @@ type clientOptsFn func() ([]kgo.Opt, error)
// recordToMessageFn is a function that converts a Kafka record into a Message.
type recordToMessageFn func(record *kgo.Record) (*service.Message, bool)

// onConnectHookFn is a function which is executed when the Kafka client is connected.
type onConnectHookFn func(ctx context.Context, res *service.Resources, client *kgo.Client)

// closeHookFn is a function which is executed when the Kafka client gets closed.
type closeHookFn func(res *service.Resources)

// FranzReaderOrdered implements a kafka reader using the franz-go library.
type FranzReaderOrdered struct {
clientOptsFn clientOptsFn
Expand All @@ -86,15 +80,13 @@ type FranzReaderOrdered struct {
lagUpdater *asyncroutine.Periodic
topicLagGauge *service.MetricGauge
topicLagCache sync.Map
client *kgo.Client
Client *kgo.Client

consumerGroup string
commitPeriod time.Duration
topicLagRefreshPeriod time.Duration
cacheLimit uint64
recordToMessageFn recordToMessageFn
onConnectHookFn onConnectHookFn
closeHookFn closeHookFn
readBackOff backoff.BackOff

res *service.Resources
Expand All @@ -106,9 +98,7 @@ type FranzReaderOrdered struct {
// Optional parameters:
// - `recordToMessageFn` is a function that converts a Kafka record into a Message. If set to `nil`,
// `FranzRecordToMessageV1` is used instead.
// - `onConnectHookFn` is a function which is executed when the Kafka client is connected. It can be set to `nil`.
// - `closeHookFn` is a function which is executed when the Kafka client gets closed. It can be set to `nil`.
func NewFranzReaderOrderedFromConfig(conf *service.ParsedConfig, res *service.Resources, clientOptsFn clientOptsFn, recordToMessageFn recordToMessageFn, onConnectHookFn onConnectHookFn, closeHookFn closeHookFn) (*FranzReaderOrdered, error) {
func NewFranzReaderOrderedFromConfig(conf *service.ParsedConfig, res *service.Resources, clientOptsFn clientOptsFn, recordToMessageFn recordToMessageFn) (*FranzReaderOrdered, error) {
readBackOff := backoff.NewExponentialBackOff()
readBackOff.InitialInterval = time.Millisecond
readBackOff.MaxInterval = time.Millisecond * 100
Expand All @@ -125,8 +115,6 @@ func NewFranzReaderOrderedFromConfig(conf *service.ParsedConfig, res *service.Re
clientOptsFn: clientOptsFn,
topicLagGauge: res.Metrics().NewGauge("redpanda_lag", "topic", "partition"),
recordToMessageFn: recordToMessageFn,
onConnectHookFn: onConnectHookFn,
closeHookFn: closeHookFn,
}

f.consumerGroup, _ = conf.FieldString(kroFieldConsumerGroup)
Expand Down Expand Up @@ -393,10 +381,10 @@ func (f *FranzReaderOrdered) Connect(ctx context.Context) error {
commitFn := func(r *kgo.Record) {}
if f.consumerGroup != "" {
commitFn = func(r *kgo.Record) {
if f.client == nil {
if f.Client == nil {
return
}
f.client.MarkCommitRecords(r)
f.Client.MarkCommitRecords(r)
}
}

Expand Down Expand Up @@ -429,30 +417,26 @@ func (f *FranzReaderOrdered) Connect(ctx context.Context) error {
)
}

if f.client, err = kgo.NewClient(clientOpts...); err != nil {
if f.Client, err = kgo.NewClient(clientOpts...); err != nil {
return err
}

// Check connectivity to cluster
if err = f.client.Ping(ctx); err != nil {
if err = f.Client.Ping(ctx); err != nil {
return fmt.Errorf("failed to connect to cluster: %s", err)
}

topics := f.client.GetConsumeTopics()
topics := f.Client.GetConsumeTopics()
if len(topics) > 0 {
f.log.Debugf("Consuming from topics: %s", topics)
} else {
f.log.Warn("Topic filter did not match any existing topics")
}

if f.onConnectHookFn != nil {
f.onConnectHookFn(ctx, f.res, f.client)
}

if f.lagUpdater != nil {
f.lagUpdater.Stop()
}
adminClient := kadm.NewClient(f.client)
adminClient := kadm.NewClient(f.Client)
f.lagUpdater = asyncroutine.NewPeriodicWithContext(f.topicLagRefreshPeriod, func(ctx context.Context) {
ctx, done := context.WithTimeout(ctx, f.topicLagRefreshPeriod)
defer done()
Expand Down Expand Up @@ -480,10 +464,7 @@ func (f *FranzReaderOrdered) Connect(ctx context.Context) error {

go func() {
defer func() {
if f.closeHookFn != nil {
f.closeHookFn(f.res)
}
f.client.Close()
f.Client.Close()
if f.shutSig.IsSoftStopSignalled() {
f.shutSig.TriggerHasStopped()
}
Expand All @@ -500,7 +481,7 @@ func (f *FranzReaderOrdered) Connect(ctx context.Context) error {
// In this case we don't want to actually resume any of them yet so
// I add a forced timeout to deal with it.
stallCtx, pollDone := context.WithTimeout(closeCtx, time.Second)
fetches := f.client.PollFetches(stallCtx)
fetches := f.Client.PollFetches(stallCtx)
pollDone()

if errs := fetches.Errors(); len(errs) > 0 {
Expand All @@ -525,7 +506,7 @@ func (f *FranzReaderOrdered) Connect(ctx context.Context) error {
}

if nonTemporalErr {
f.client.Close()
f.Client.Close()
return
}
}
Expand All @@ -548,11 +529,11 @@ func (f *FranzReaderOrdered) Connect(ctx context.Context) error {
pauseTopicPartitions[p.Topic] = append(pauseTopicPartitions[p.Topic], p.Partition)
}
})
_ = f.client.PauseFetchPartitions(pauseTopicPartitions)
_ = f.Client.PauseFetchPartitions(pauseTopicPartitions)

noActivePartitions:
for {
pausedPartitionTopics := f.client.PauseFetchPartitions(nil)
pausedPartitionTopics := f.Client.PauseFetchPartitions(nil)

// Walk all the disabled topic partitions and check whether any
// of them can be resumed.
Expand All @@ -565,7 +546,7 @@ func (f *FranzReaderOrdered) Connect(ctx context.Context) error {
}
}
if len(resumeTopicPartitions) > 0 {
f.client.ResumeFetchPartitions(resumeTopicPartitions)
f.Client.ResumeFetchPartitions(resumeTopicPartitions)
}

if len(f.consumerGroup) == 0 || len(resumeTopicPartitions) > 0 || checkpoints.tallyActivePartitions(pausedPartitionTopics) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/kafka/input_redpanda.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func init() {

rdr, err := NewFranzReaderOrderedFromConfig(conf, mgr, func() ([]kgo.Opt, error) {
return clientOpts, nil
}, nil, nil, nil)
}, nil)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 7888a52

Please sign in to comment.