From 9cb8972dee7cde14755358f8551f62ce7356a23a Mon Sep 17 00:00:00 2001 From: Dusan Malusev Date: Tue, 23 Apr 2024 20:12:56 +0200 Subject: [PATCH 1/2] Refactor connection options in consumer Signed-off-by: Dusan Malusev --- README.md | 16 +++++++------- amqp_test.go | 29 ++++--------------------- consumer/config.go | 45 --------------------------------------- consumer/consumer.go | 44 +++++++++++++++++++++++++++----------- examples/consumer/main.go | 19 ++++++++--------- 5 files changed, 52 insertions(+), 101 deletions(-) diff --git a/README.md b/README.md index 9a58262..dc07a43 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ [![codecov](https://codecov.io/gh/nano-interactive/go-amqp/branch/master/graph/badge.svg?token=JQTAGQ11DS)](https://codecov.io/gh/nano-interactive/go-amqp) [![Go Report Card](https://goreportcard.com/badge/github.com/nano-interactive/go-amqp)](https://goreportcard.com/report/github.com/nano-interactive/go-amqp) -# Introduction +## Introduction Having a distributed system with a message broker and no good wrappers for AMQP can be a hard work to develop, maintain and keeps the system running. Working with async protocol in a language that does not support async code is a nightmare, especially with system that need to run 24/7/365. A lot of things can go wrong (e.g connection breaks, channel closes, memory leaks ...). @@ -52,7 +52,6 @@ is places in the library so that the users of the library don't even think about ```go -// consumer (c) is non blocking c, err := consumer.NewFunc( handler, consumer.QueueDeclare{QueueName: "testing_queue"}, @@ -68,6 +67,8 @@ c, err := consumer.NewFunc( }), ) +c.Start(context.Background()) + time.Sleep(100*time.Second) c.Close() @@ -85,8 +86,8 @@ type Message { } func handler(ctx context.Context, msg Message) error { - fmt.Printf("[INFO] Message received: %s\n", msg.Name) - return nil + fmt.Printf("[INFO] Message received: %s\n", msg.Name) + return nil } ``` @@ -101,8 +102,8 @@ type Message { type MyHandler struct{} func (h MyHandler) Handle(ctx context.Context, msg Message) error { - fmt.Printf("[INFO] Message received: %s\n", msg.Name) - return nil + fmt.Printf("[INFO] Message received: %s\n", msg.Name) + return nil } ``` @@ -172,7 +173,6 @@ if err := pub.Close(); err != nil { ``` - ### Testing Example -`go-amqp` library provides a few testing helpers for integration testing. Functions for setting up AMQP channels, queues and exchanges (binding them together), publishing and consuming messages for asserting. \ No newline at end of file +`go-amqp` library provides a few testing helpers for integration testing. Functions for setting up AMQP channels, queues and exchanges (binding them together), publishing and consuming messages for asserting. diff --git a/amqp_test.go b/amqp_test.go index 5757f6a..38c7921 100644 --- a/amqp_test.go +++ b/amqp_test.go @@ -32,16 +32,11 @@ func handler(_ context.Context, msg Message) error { func ExampleConsumer() { c, err := consumer.NewFunc(handler, + connection.DefaultConfig, consumer.QueueDeclare{QueueName: "testing_queue"}, consumer.WithOnMessageError[Message](func(_ context.Context, _ *amqp091.Delivery, err error) { _, _ = fmt.Fprintf(os.Stderr, "[ERROR] Message error: %s\n", err) }), - consumer.WithConnectionOptions[Message](connection.Config{ - Host: "127.0.0.1", - User: "guest", - Password: "guest", - ConnectionName: "go-amqp-consumer", - }), ) if err != nil { panic(err) @@ -72,16 +67,11 @@ func (h MyHandler) Handle(_ context.Context, msg Message) error { func ExampleConsumerWithHandler() { c, err := consumer.New[Message](MyHandler{}, + connection.DefaultConfig, consumer.QueueDeclare{QueueName: "testing_queue"}, consumer.WithOnMessageError[Message](func(_ context.Context, _ *amqp091.Delivery, err error) { _, _ = fmt.Fprintf(os.Stderr, "[ERROR] Message error: %s\n", err) }), - consumer.WithConnectionOptions[Message](connection.Config{ - Host: "127.0.0.1", - User: "guest", - Password: "guest", - ConnectionName: "go-amqp-consumer", - }), ) if err != nil { panic(err) @@ -109,17 +99,11 @@ func ExampleConsumerWithSignal() { signal.Notify(sig, os.Interrupt, syscall.SIGTERM) c, err := consumer.NewFunc(handler, + connection.DefaultConfig, consumer.QueueDeclare{QueueName: "testing_queue"}, consumer.WithOnMessageError[Message](func(_ context.Context, _ *amqp091.Delivery, err error) { _, _ = fmt.Fprintf(os.Stderr, "[ERROR] Message error: %s\n", err) }), - consumer.WithContext[Message](ctx), - consumer.WithConnectionOptions[Message](connection.Config{ - Host: "127.0.0.1", - User: "guest", - Password: "guest", - ConnectionName: "go-amqp-consumer", - }), ) if err != nil { panic(err) @@ -157,16 +141,11 @@ func (h MyRawHandler) Handle(_ context.Context, d *amqp091.Delivery) error { func Example_ConsumerWithRawHandler() { c, err := consumer.NewRaw(MyRawHandler{}, + connection.DefaultConfig, consumer.QueueDeclare{QueueName: "testing_queue"}, consumer.WithOnMessageError[Message](func(_ context.Context, _ *amqp091.Delivery, err error) { _, _ = fmt.Fprintf(os.Stderr, "[ERROR] Message error: %s\n", err) }), - consumer.WithConnectionOptions[Message](connection.Config{ - Host: "127.0.0.1", - User: "guest", - Password: "guest", - ConnectionName: "go-amqp-consumer", - }), ) if err != nil { panic(err) diff --git a/consumer/config.go b/consumer/config.go index a5613f1..4f71b52 100644 --- a/consumer/config.go +++ b/consumer/config.go @@ -24,7 +24,6 @@ type QueueDeclare struct { } type Config[T any] struct { - ctx context.Context serializer serializer.Serializer[T] onError connection.OnErrorFunc onMessageError func(context.Context, *amqp091.Delivery, error) @@ -82,47 +81,3 @@ func WithOnErrorFunc[T any](onError connection.OnErrorFunc) Option[T] { c.onError = onError } } - -func WithContext[T any](ctx context.Context) Option[T] { - return func(c *Config[T]) { - c.ctx = ctx - } -} - -func WithConnectionOptions[T any](connectionOptions connection.Config) Option[T] { - return func(c *Config[T]) { - if connectionOptions.Channels == 0 { - connectionOptions.Channels = connection.DefaultConfig.Channels - } - - if connectionOptions.Vhost == "" { - connectionOptions.Vhost = connection.DefaultConfig.Vhost - } - - if connectionOptions.Host == "" { - connectionOptions.Host = connection.DefaultConfig.Host - } - - if connectionOptions.Port == 0 { - connectionOptions.Port = connection.DefaultConfig.Port - } - - if connectionOptions.User == "" { - connectionOptions.User = connection.DefaultConfig.User - } - - if connectionOptions.Password == "" { - connectionOptions.Password = connection.DefaultConfig.Password - } - - if connectionOptions.ReconnectInterval == 0 { - connectionOptions.ReconnectInterval = connection.DefaultConfig.ReconnectInterval - } - - if connectionOptions.ReconnectRetry == 0 { - connectionOptions.ReconnectRetry = connection.DefaultConfig.ReconnectRetry - } - - c.connectionOptions = connectionOptions - } -} diff --git a/consumer/consumer.go b/consumer/consumer.go index 055c98e..0212222 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -28,34 +28,38 @@ type ( } ) -func NewRaw[T Message](handler RawHandler, queueDeclare QueueDeclare, options ...Option[T]) (Consumer[T], error) { +func NewRaw[T Message]( + handler RawHandler, + connectionOptions connection.Config, + queueDeclare QueueDeclare, + options ...Option[T], +) (Consumer[T], error) { var msg T if reflect.ValueOf(msg).Kind() == reflect.Ptr { return Consumer[T]{}, ErrMessageTypeInvalid } - cfg := Config[T]{ + cfg := &Config[T]{ queueConfig: QueueConfig{ PrefetchCount: 128, Workers: 1, }, retryCount: 1, serializer: serializer.JSON[T]{}, - ctx: context.Background(), onError: func(err error) { if errors.Is(err, connection.ErrRetriesExhausted) { panic(err) } }, - connectionOptions: connection.DefaultConfig, + connectionOptions: connectionOptions, onMessageError: nil, onListenerStart: nil, onListenerExit: nil, } for _, o := range options { - o(&cfg) + o(cfg) } if queueDeclare.QueueName == "" { @@ -68,17 +72,22 @@ func NewRaw[T Message](handler RawHandler, queueDeclare QueueDeclare, options .. return Consumer[T]{ watcher: semaphore.NewWeighted(int64(cfg.queueConfig.Workers)), - cfg: &cfg, + cfg: cfg, queueDeclare: &queueDeclare, handler: handler, }, nil } -func NewRawFunc[T Message](h RawHandlerFunc, queueDeclare QueueDeclare, options ...Option[T]) (Consumer[T], error) { - return NewRaw(h, queueDeclare, options...) +func NewRawFunc[T Message](h RawHandlerFunc, connectionOptions connection.Config, queueDeclare QueueDeclare, options ...Option[T]) (Consumer[T], error) { + return NewRaw(h, connectionOptions, queueDeclare, options...) } -func NewFunc[T Message](h HandlerFunc[T], queueDeclare QueueDeclare, options ...Option[T]) (Consumer[T], error) { +func NewFunc[T Message]( + h HandlerFunc[T], + connectionOptions connection.Config, + queueDeclare QueueDeclare, + options ...Option[T], +) (Consumer[T], error) { cfg := Config[T]{} for _, o := range options { @@ -110,10 +119,15 @@ func NewFunc[T Message](h HandlerFunc[T], queueDeclare QueueDeclare, options ... rawHandler = privHandler } - return NewRaw(rawHandler, queueDeclare, options...) + return NewRaw(rawHandler, connectionOptions, queueDeclare, options...) } -func New[T Message](h Handler[T], queueDeclare QueueDeclare, options ...Option[T]) (Consumer[T], error) { +func New[T Message]( + h Handler[T], + connectionOptions connection.Config, + queueDeclare QueueDeclare, + options ...Option[T], +) (Consumer[T], error) { cfg := Config[T]{} for _, o := range options { @@ -145,9 +159,13 @@ func New[T Message](h Handler[T], queueDeclare QueueDeclare, options ...Option[T rawHandler = privHandler } - return NewRaw(rawHandler, queueDeclare, options...) + return NewRaw(rawHandler, connectionOptions, queueDeclare, options...) +} + +func (c Consumer[T]) CloseWithContext(ctx context.Context) error { + return c.watcher.Acquire(ctx, int64(c.cfg.queueConfig.Workers)) } func (c Consumer[T]) Close() error { - return c.watcher.Acquire(context.Background(), int64(c.cfg.queueConfig.Workers)) + return c.CloseWithContext(context.Background()) } diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 1fcc3db..ce21820 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -36,16 +36,7 @@ func main() { c, err := consumer.NewFunc( handler, - consumer.QueueDeclare{ - ExchangeBindings: []consumer.ExchangeBinding{{ExchangeName: "testing_publisher"}}, - QueueName: "testing_queue", - Durable: true, - }, - consumer.WithOnMessageError[Message](func(ctx context.Context, d *amqp091.Delivery, err error) { - _, _ = fmt.Fprintf(os.Stderr, "[ERROR] Message error: %s\n", err) - }), - consumer.WithContext[Message](ctx), - consumer.WithConnectionOptions[Message](connection.Config{ + connection.Config{ Host: "127.0.0.1", Port: 5672, User: "guest", @@ -56,6 +47,14 @@ func main() { ReconnectInterval: 1 * time.Second, Channels: 1000, FrameSize: 8192, + }, + consumer.QueueDeclare{ + ExchangeBindings: []consumer.ExchangeBinding{{ExchangeName: "testing_publisher"}}, + QueueName: "testing_queue", + Durable: true, + }, + consumer.WithOnMessageError[Message](func(ctx context.Context, d *amqp091.Delivery, err error) { + _, _ = fmt.Fprintf(os.Stderr, "[ERROR] Message error: %s\n", err) }), consumer.WithQueueConfig[Message](consumer.QueueConfig{ Workers: 1, From cd91e463fed6fd5ff82825e2724aca110c00d6b2 Mon Sep 17 00:00:00 2001 From: Dusan Malusev Date: Tue, 23 Apr 2024 20:15:14 +0200 Subject: [PATCH 2/2] Update github actions setup-go@v4 -> setup-go@v5 Signed-off-by: Dusan Malusev --- .github/workflows/test.yml | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4d79343..2694271 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -1,4 +1,4 @@ -name: 'Testing' +name: "Testing" on: workflow_call: @@ -19,12 +19,12 @@ jobs: RABBITMQ_DEFAULT_USER: guest RABBITMQ_DEFAULT_PASS: guest ports: - - '5672:5672' + - "5672:5672" strategy: matrix: - os: ['ubuntu-latest'] - go: [ '1.22' ] + os: ["ubuntu-latest"] + go: ["1.22"] runs-on: ${{ matrix.os }} @@ -32,13 +32,7 @@ jobs: - uses: actions/checkout@v4 with: fetch-depth: 1 - - uses: actions/cache@v4 - with: - path: ~/go/pkg/mod - key: ${{ runner.os }}-go-tests-${{ hashFiles('**/go.sum') }} - restore-keys: | - ${{ runner.os }}-go-tests-${{ hashFiles('**/go.sum') }} - - uses: actions/setup-go@v4 + - uses: actions/setup-go@v5 with: go-version: ${{ matrix.go }} - name: Install Task @@ -54,4 +48,4 @@ jobs: uses: codecov/codecov-action@v4.0.1 with: token: ${{ secrets.CODECOV_TOKEN }} - slug: nano-interactive/go-amqp \ No newline at end of file + slug: nano-interactive/go-amqp