From 14023888a8eefc357b98f917c49c95b7e09d3aed Mon Sep 17 00:00:00 2001 From: Dusan Malusev Date: Tue, 23 Apr 2024 20:34:20 +0200 Subject: [PATCH 1/3] Refactor connection options in producer Signed-off-by: Dusan Malusev --- README.md | 25 +++++---- amqp_test.go | 8 +-- examples/publisher/main.go | 6 +-- publisher/config.go | 100 ++++++++++++++++++------------------ publisher/publisher.go | 43 +++++++++------- publisher/publisher_test.go | 37 +++++++++---- 6 files changed, 120 insertions(+), 99 deletions(-) diff --git a/README.md b/README.md index dc07a43..564c8c7 100644 --- a/README.md +++ b/README.md @@ -54,23 +54,25 @@ is places in the library so that the users of the library don't even think about c, err := consumer.NewFunc( handler, - consumer.QueueDeclare{QueueName: "testing_queue"}, - consumer.WithOnMessageError[Message](func(ctx context.Context, d *amqp091.Delivery, err error) { - fmt.Fprintf(os.Stderr, "[ERROR] Message error: %s\n", err) - }), - consumer.WithConnectionOptions[Message](connection.Config{ + connection.Config{ Host: "127.0.0.1", Port: 5672, User: "guest", Password: "guest", ConnectionName: "go-amqp-consumer", + }, + consumer.QueueDeclare{QueueName: "testing_queue"}, + consumer.WithOnMessageError[Message](func(ctx context.Context, d *amqp091.Delivery, err error) { + fmt.Fprintf(os.Stderr, "[ERROR] Message error: %s\n", err) }), ) -c.Start(context.Background()) +go c.Start(context.Background()) -time.Sleep(100*time.Second) +// Wait for some event to exit the Consumer +time.Sleep(30*time.Second) +// c.CloseWithContext(context.Background()) -> for timeouts c.Close() ``` @@ -134,13 +136,14 @@ Publising message is simple, the abstraction is very simple ```go pub, err := publisher.New[Message]( - "testing_publisher", - publisher.WithConnectionOptions[Message](connection.Config{ + context.TODO(), + connection.Config{ Host: "127.0.0.1", User: "guest", Password: "guest", ConnectionName: "go-amqp-publisher", - }), + } + "testing_publisher", ) if err != nil { panic(err) @@ -156,7 +159,7 @@ message := Message{ Name: "Nano Interactive", } -if err = pub.Publish(context.Background(), message); err != nil { +if err = pub.Publish(context.TODO(), message); err != nil { panic(err) } diff --git a/amqp_test.go b/amqp_test.go index 38c7921..91bc4d6 100644 --- a/amqp_test.go +++ b/amqp_test.go @@ -167,13 +167,9 @@ func Example_ConsumerWithRawHandler() { func ExamplePublisher() { pub, err := publisher.New[Message]( + context.Background(), + connection.DefaultConfig, "testing_publisher", - publisher.WithConnectionOptions[Message](connection.Config{ - Host: "127.0.0.1", - User: "guest", - Password: "guest", - ConnectionName: "go-amqp-publisher", - }), ) if err != nil { panic(err) diff --git a/examples/publisher/main.go b/examples/publisher/main.go index d6ee5bb..6cedae1 100644 --- a/examples/publisher/main.go +++ b/examples/publisher/main.go @@ -30,10 +30,10 @@ func main() { ctx := context.Background() - pub, err := publisher.New( + pub, err := publisher.New[Message]( + ctx, + connConfig, "testing_publisher", - publisher.WithContext[Message](ctx), - publisher.WithConnectionOptions[Message](connConfig), ) if err != nil { panic(err) diff --git a/publisher/config.go b/publisher/config.go index cbf8d72..a83c629 100644 --- a/publisher/config.go +++ b/publisher/config.go @@ -1,20 +1,18 @@ package publisher import ( - "context" - "github.com/nano-interactive/go-amqp/v3/connection" "github.com/nano-interactive/go-amqp/v3/serializer" ) type ( Config[T any] struct { - ctx context.Context - serializer serializer.Serializer[T] - onError connection.OnErrorFunc - exchange ExchangeDeclare - connectionOptions connection.Config - messageBuffering int + // ctx context.Context + serializer serializer.Serializer[T] + onError connection.OnErrorFunc + exchange ExchangeDeclare + // connectionOptions connection.Config + messageBuffering int } Option[T any] func(*Config[T]) @@ -38,49 +36,49 @@ func WithOnErrorFunc[T any](onError connection.OnErrorFunc) Option[T] { } } -func WithContext[T any](ctx context.Context) Option[T] { - return func(c *Config[T]) { - c.ctx = ctx - } -} - -func WithConnectionOptions[T any](connectionOptions connection.Config) Option[T] { - return func(c *Config[T]) { - if connectionOptions.Channels == 0 { - connectionOptions.Channels = connection.DefaultConfig.Channels - } - - if connectionOptions.Vhost == "" { - connectionOptions.Vhost = connection.DefaultConfig.Vhost - } - - if connectionOptions.Host == "" { - connectionOptions.Host = connection.DefaultConfig.Host - } - - if connectionOptions.Port == 0 { - connectionOptions.Port = connection.DefaultConfig.Port - } - - if connectionOptions.User == "" { - connectionOptions.User = connection.DefaultConfig.User - } - - if connectionOptions.Password == "" { - connectionOptions.Password = connection.DefaultConfig.Password - } - - if connectionOptions.ReconnectInterval == 0 { - connectionOptions.ReconnectInterval = connection.DefaultConfig.ReconnectInterval - } - - if connectionOptions.ReconnectRetry == 0 { - connectionOptions.ReconnectRetry = connection.DefaultConfig.ReconnectRetry - } - - c.connectionOptions = connectionOptions - } -} +// func WithContext[T any](ctx context.Context) Option[T] { +// return func(c *Config[T]) { +// c.ctx = ctx +// } +// } + +// func WithConnectionOptions[T any](connectionOptions connection.Config) Option[T] { +// return func(c *Config[T]) { +// if connectionOptions.Channels == 0 { +// connectionOptions.Channels = connection.DefaultConfig.Channels +// } + +// if connectionOptions.Vhost == "" { +// connectionOptions.Vhost = connection.DefaultConfig.Vhost +// } + +// if connectionOptions.Host == "" { +// connectionOptions.Host = connection.DefaultConfig.Host +// } + +// if connectionOptions.Port == 0 { +// connectionOptions.Port = connection.DefaultConfig.Port +// } + +// if connectionOptions.User == "" { +// connectionOptions.User = connection.DefaultConfig.User +// } + +// if connectionOptions.Password == "" { +// connectionOptions.Password = connection.DefaultConfig.Password +// } + +// if connectionOptions.ReconnectInterval == 0 { +// connectionOptions.ReconnectInterval = connection.DefaultConfig.ReconnectInterval +// } + +// if connectionOptions.ReconnectRetry == 0 { +// connectionOptions.ReconnectRetry = connection.DefaultConfig.ReconnectRetry +// } + +// c.connectionOptions = connectionOptions +// } +// } func WithBufferedMessages[T any](capacity int) Option[T] { return func(c *Config[T]) { diff --git a/publisher/publisher.go b/publisher/publisher.go index a46c0ee..c801f66 100644 --- a/publisher/publisher.go +++ b/publisher/publisher.go @@ -6,11 +6,11 @@ import ( "fmt" "io" "os" - "sync" "sync/atomic" "time" "github.com/rabbitmq/amqp091-go" + "golang.org/x/sync/semaphore" "github.com/nano-interactive/go-amqp/v3/connection" "github.com/nano-interactive/go-amqp/v3/serializer" @@ -35,10 +35,9 @@ type ( serializer serializer.Serializer[T] conn *connection.Connection ch atomic.Pointer[amqp091.Channel] - cancel context.CancelFunc + semaphore *semaphore.Weighted exchangeName string routingKey string - wg sync.WaitGroup closing atomic.Bool gettingCh atomic.Bool } @@ -77,7 +76,7 @@ func (p *Publisher[T]) swapChannel(connection *amqp091.Connection, cfg Config[T] } func (p *Publisher[T]) connectionReadyWorker(ctx context.Context, conn *amqp091.Connection, notifyClose chan *amqp091.Error, cfg Config[T]) { - defer p.wg.Done() + defer p.semaphore.Release(1) errCh := make(chan error) defer close(errCh) var err error @@ -121,7 +120,10 @@ func (p *Publisher[T]) connectionReadyWorker(ctx context.Context, conn *amqp091. func (p *Publisher[T]) onConnectionReady(cfg Config[T]) connection.OnConnectionReady { return func(ctx context.Context, connection *amqp091.Connection) error { - p.wg.Add(1) + if err := p.semaphore.Acquire(ctx, 1); err != nil { + return err + } + notifyClose, err := p.swapChannel(connection, cfg) if err != nil { return err @@ -150,16 +152,14 @@ func newChannel( return ch, notifyClose, nil } -func New[T any](exchangeName string, options ...Option[T]) (*Publisher[T], error) { +func New[T any](ctx context.Context, connectionOpts connection.Config, exchangeName string, options ...Option[T]) (*Publisher[T], error) { if exchangeName == "" { return nil, ErrExchangeNameRequired } cfg := Config[T]{ - serializer: serializer.JSON[T]{}, - messageBuffering: 1, - connectionOptions: connection.DefaultConfig, - ctx: context.Background(), + serializer: serializer.JSON[T]{}, + messageBuffering: 1, exchange: ExchangeDeclare{ name: exchangeName, RoutingKey: "", @@ -183,16 +183,14 @@ func New[T any](exchangeName string, options ...Option[T]) (*Publisher[T], error option(&cfg) } - ctx, cancel := context.WithCancel(cfg.ctx) - publisher := &Publisher[T]{ serializer: cfg.serializer, - cancel: cancel, exchangeName: exchangeName, routingKey: cfg.exchange.RoutingKey, + semaphore: semaphore.NewWeighted(1), } - conn, err := connection.New(ctx, cfg.connectionOptions, connection.Events{ + conn, err := connection.New(ctx, connectionOpts, connection.Events{ OnConnectionReady: publisher.onConnectionReady(cfg), OnBeforeConnectionReady: func(_ context.Context) error { publisher.gettingCh.Store(true) @@ -239,9 +237,18 @@ func (p *Publisher[T]) Publish(ctx context.Context, msg T, _ ...PublishConfig) e ) } -func (p *Publisher[T]) Close() error { +func (p *Publisher[T]) CloseWithContext(ctx context.Context) error { p.closing.Store(true) - p.cancel() - p.wg.Wait() - return p.conn.Close() + + if err := p.semaphore.Acquire(ctx, 1); err != nil { + return err + } + + defer p.conn.Close() + + return nil +} + +func (p *Publisher[T]) Close() error { + return p.CloseWithContext(context.Background()) } diff --git a/publisher/publisher_test.go b/publisher/publisher_test.go index 42068bb..05191eb 100644 --- a/publisher/publisher_test.go +++ b/publisher/publisher_test.go @@ -46,7 +46,7 @@ func TestPublisherNew(t *testing.T) { mappings := amqp_testing.NewMappings(t). AddMapping("test_exchange", "test_queue") - pub, err := publisher.New[Msg](mappings.Exchange("test_exchange")) + pub, err := publisher.New[Msg](context.TODO(), connection.DefaultConfig, mappings.Exchange("test_exchange")) assert.NoError(err) assert.NotNil(pub) @@ -59,8 +59,9 @@ func TestPublisherNew(t *testing.T) { AddMapping("test_exchange", "test_queue") pub, err := publisher.New[Msg]( + context.TODO(), + connection.DefaultConfig, mappings.Exchange("test_exchange"), - publisher.WithContext[Msg](context.Background()), publisher.WithSerializer[Msg](serializer.JSON[Msg]{}), ) @@ -76,13 +77,13 @@ func TestPublisherNew(t *testing.T) { t.Cleanup(cancel) pub, err := publisher.New[Msg]( - "test_exchange", - publisher.WithContext[Msg](ctx), - publisher.WithConnectionOptions[Msg](connection.Config{ + ctx, + connection.Config{ Host: "localhost", Port: 1234, ReconnectRetry: 2, - }), + }, + "test_exchange", ) assert.Error(err) @@ -100,7 +101,7 @@ func TestPublisherPublish(t *testing.T) { mappings := amqp_testing.NewMappings(t). AddMapping("test_exchange_basic", "test_queue_basic") - pub, err := publisher.New[Msg](mappings.Exchange("test_exchange_basic")) + pub, err := publisher.New[Msg](context.TODO(), connection.DefaultConfig, mappings.Exchange("test_exchange_basic")) assert.NoError(err) assert.NotNil(pub) @@ -122,6 +123,8 @@ func TestPublisherPublish(t *testing.T) { mockSerializer := &MockSerializer{} pub, err := publisher.New[Msg]( + context.TODO(), + connection.DefaultConfig, mappings.Exchange("test_exchange_serializer"), publisher.WithSerializer[Msg](mockSerializer), ) @@ -160,6 +163,8 @@ func TestPublisherPublish(t *testing.T) { mockSerializer := &MockSerializer{} pub, err := publisher.New[Msg]( + context.TODO(), + connection.DefaultConfig, mappings.Exchange("test_exchange_serializer_fails"), publisher.WithSerializer[Msg](mockSerializer), ) @@ -199,7 +204,11 @@ func TestPublisherClose(t *testing.T) { mappings := amqp_testing.NewMappings(t). AddMapping("test_exchange_close", "test_queue_close") - pub, err := publisher.New[Msg](mappings.Exchange("test_exchange_close")) + pub, err := publisher.New[Msg]( + context.TODO(), + connection.DefaultConfig, + mappings.Exchange("test_exchange_close"), + ) assert.NoError(err) assert.NotNil(pub) @@ -211,7 +220,11 @@ func TestPublisherClose(t *testing.T) { mappings := amqp_testing.NewMappings(t). AddMapping("test_exchange_after_close", "test_queue_after_close") - pub, err := publisher.New[Msg](mappings.Exchange("test_exchange_after_close")) + pub, err := publisher.New[Msg]( + context.TODO(), + connection.DefaultConfig, + mappings.Exchange("test_exchange_after_close"), + ) assert.NoError(err) assert.NotNil(pub) @@ -225,7 +238,11 @@ func TestPublisherClose(t *testing.T) { mappings := amqp_testing.NewMappings(t). AddMapping("test_exchange_multiple_close_call", "test_queue_multiple_close_call") - pub, err := publisher.New[Msg](mappings.Exchange("test_exchange_multiple_close_call")) + pub, err := publisher.New[Msg]( + context.TODO(), + connection.DefaultConfig, + mappings.Exchange("test_exchange_multiple_close_call"), + ) assert.NoError(err) assert.NotNil(pub) From dfd6a6ba78d8e7b49cdb27562079da21761f08eb Mon Sep 17 00:00:00 2001 From: Dusan Malusev Date: Tue, 23 Apr 2024 20:43:48 +0200 Subject: [PATCH 2/3] Fix issue with close -> close cannot be called multiple times Signed-off-by: Dusan Malusev --- publisher/publisher.go | 13 ++++++++++--- publisher/publisher_test.go | 19 ------------------- 2 files changed, 10 insertions(+), 22 deletions(-) diff --git a/publisher/publisher.go b/publisher/publisher.go index c801f66..6cadd7c 100644 --- a/publisher/publisher.go +++ b/publisher/publisher.go @@ -36,6 +36,7 @@ type ( conn *connection.Connection ch atomic.Pointer[amqp091.Channel] semaphore *semaphore.Weighted + cancel context.CancelFunc exchangeName string routingKey string closing atomic.Bool @@ -76,12 +77,11 @@ func (p *Publisher[T]) swapChannel(connection *amqp091.Connection, cfg Config[T] } func (p *Publisher[T]) connectionReadyWorker(ctx context.Context, conn *amqp091.Connection, notifyClose chan *amqp091.Error, cfg Config[T]) { - defer p.semaphore.Release(1) errCh := make(chan error) - defer close(errCh) - var err error defer func() { + close(errCh) + p.semaphore.Release(1) p.closing.Store(true) ch := p.ch.Load() if !ch.IsClosed() { @@ -89,6 +89,8 @@ func (p *Publisher[T]) connectionReadyWorker(ctx context.Context, conn *amqp091. } }() + var err error + for { select { case <-ctx.Done(): @@ -183,11 +185,14 @@ func New[T any](ctx context.Context, connectionOpts connection.Config, exchangeN option(&cfg) } + ctx, cancel := context.WithCancel(ctx) + publisher := &Publisher[T]{ serializer: cfg.serializer, exchangeName: exchangeName, routingKey: cfg.exchange.RoutingKey, semaphore: semaphore.NewWeighted(1), + cancel: cancel, } conn, err := connection.New(ctx, connectionOpts, connection.Events{ @@ -203,6 +208,7 @@ func New[T any](ctx context.Context, connectionOpts connection.Config, exchangeN } publisher.conn = conn + return publisher, nil } @@ -239,6 +245,7 @@ func (p *Publisher[T]) Publish(ctx context.Context, msg T, _ ...PublishConfig) e func (p *Publisher[T]) CloseWithContext(ctx context.Context) error { p.closing.Store(true) + p.cancel() if err := p.semaphore.Acquire(ctx, 1); err != nil { return err diff --git a/publisher/publisher_test.go b/publisher/publisher_test.go index 05191eb..b2aeebe 100644 --- a/publisher/publisher_test.go +++ b/publisher/publisher_test.go @@ -231,23 +231,4 @@ func TestPublisherClose(t *testing.T) { assert.NoError(pub.Close()) assert.ErrorIs(pub.Publish(context.Background(), Msg{Name: "test"}), publisher.ErrClosed) }) - - t.Run("Multiple_Close_Calling", func(t *testing.T) { - t.Parallel() - - mappings := amqp_testing.NewMappings(t). - AddMapping("test_exchange_multiple_close_call", "test_queue_multiple_close_call") - - pub, err := publisher.New[Msg]( - context.TODO(), - connection.DefaultConfig, - mappings.Exchange("test_exchange_multiple_close_call"), - ) - assert.NoError(err) - assert.NotNil(pub) - - assert.NoError(pub.Close()) - assert.NoError(pub.Close()) - assert.NoError(pub.Close()) - }) } From 0a9f6c78da5e2dad67839ba53ae98f5717299962 Mon Sep 17 00:00:00 2001 From: Dusan Malusev Date: Tue, 23 Apr 2024 20:46:45 +0200 Subject: [PATCH 3/3] Update github actions setup-task@v1 -> setup-task@v2 Signed-off-by: Dusan Malusev --- .github/workflows/test.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2694271..ea8d053 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -36,7 +36,7 @@ jobs: with: go-version: ${{ matrix.go }} - name: Install Task - uses: arduino/setup-task@v1 + uses: arduino/setup-task@v2 with: version: 3.x repo-token: ${{ secrets.GITHUB_TOKEN }} @@ -45,7 +45,7 @@ jobs: go install github.com/gotesttools/gotestfmt/v2/cmd/gotestfmt@latest task test - name: Upload coverage reports to Codecov - uses: codecov/codecov-action@v4.0.1 + uses: codecov/codecov-action@v4 with: token: ${{ secrets.CODECOV_TOKEN }} slug: nano-interactive/go-amqp