Skip to content

Commit

Permalink
Merge pull request #3 from nano-interactive/feat/new-comsumer
Browse files Browse the repository at this point in the history
Refactor connection options in consumer
  • Loading branch information
CodeLieutenant authored Apr 23, 2024
2 parents 12e6391 + cd91e46 commit c297e4d
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 113 deletions.
18 changes: 6 additions & 12 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: 'Testing'
name: "Testing"

on:
workflow_call:
Expand All @@ -19,26 +19,20 @@ jobs:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
ports:
- '5672:5672'
- "5672:5672"

strategy:
matrix:
os: ['ubuntu-latest']
go: [ '1.22' ]
os: ["ubuntu-latest"]
go: ["1.22"]

runs-on: ${{ matrix.os }}

steps:
- uses: actions/checkout@v4
with:
fetch-depth: 1
- uses: actions/cache@v4
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-tests-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-tests-${{ hashFiles('**/go.sum') }}
- uses: actions/setup-go@v4
- uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go }}
- name: Install Task
Expand All @@ -54,4 +48,4 @@ jobs:
uses: codecov/codecov-action@v4.0.1
with:
token: ${{ secrets.CODECOV_TOKEN }}
slug: nano-interactive/go-amqp
slug: nano-interactive/go-amqp
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
[![codecov](https://codecov.io/gh/nano-interactive/go-amqp/branch/master/graph/badge.svg?token=JQTAGQ11DS)](https://codecov.io/gh/nano-interactive/go-amqp)
[![Go Report Card](https://goreportcard.com/badge/github.com/nano-interactive/go-amqp)](https://goreportcard.com/report/github.com/nano-interactive/go-amqp)

# Introduction
## Introduction

Having a distributed system with a message broker and no good wrappers for AMQP can be a hard work to develop, maintain and keeps the system running.
Working with async protocol in a language that does not support async code is a nightmare, especially with system that need to run 24/7/365. A lot of things can go wrong (e.g connection breaks, channel closes, memory leaks ...).
Expand Down Expand Up @@ -52,7 +52,6 @@ is places in the library so that the users of the library don't even think about

```go

// consumer (c) is non blocking
c, err := consumer.NewFunc(
handler,
consumer.QueueDeclare{QueueName: "testing_queue"},
Expand All @@ -68,6 +67,8 @@ c, err := consumer.NewFunc(
}),
)

c.Start(context.Background())

time.Sleep(100*time.Second)

c.Close()
Expand All @@ -85,8 +86,8 @@ type Message {
}

func handler(ctx context.Context, msg Message) error {
fmt.Printf("[INFO] Message received: %s\n", msg.Name)
return nil
fmt.Printf("[INFO] Message received: %s\n", msg.Name)
return nil
}
```

Expand All @@ -101,8 +102,8 @@ type Message {
type MyHandler struct{}

func (h MyHandler) Handle(ctx context.Context, msg Message) error {
fmt.Printf("[INFO] Message received: %s\n", msg.Name)
return nil
fmt.Printf("[INFO] Message received: %s\n", msg.Name)
return nil
}

```
Expand Down Expand Up @@ -172,7 +173,6 @@ if err := pub.Close(); err != nil {

```


### Testing Example

`go-amqp` library provides a few testing helpers for integration testing. Functions for setting up AMQP channels, queues and exchanges (binding them together), publishing and consuming messages for asserting.
`go-amqp` library provides a few testing helpers for integration testing. Functions for setting up AMQP channels, queues and exchanges (binding them together), publishing and consuming messages for asserting.
29 changes: 4 additions & 25 deletions amqp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,11 @@ func handler(_ context.Context, msg Message) error {

func ExampleConsumer() {
c, err := consumer.NewFunc(handler,
connection.DefaultConfig,
consumer.QueueDeclare{QueueName: "testing_queue"},
consumer.WithOnMessageError[Message](func(_ context.Context, _ *amqp091.Delivery, err error) {
_, _ = fmt.Fprintf(os.Stderr, "[ERROR] Message error: %s\n", err)
}),
consumer.WithConnectionOptions[Message](connection.Config{
Host: "127.0.0.1",
User: "guest",
Password: "guest",
ConnectionName: "go-amqp-consumer",
}),
)
if err != nil {
panic(err)
Expand Down Expand Up @@ -72,16 +67,11 @@ func (h MyHandler) Handle(_ context.Context, msg Message) error {

func ExampleConsumerWithHandler() {
c, err := consumer.New[Message](MyHandler{},
connection.DefaultConfig,
consumer.QueueDeclare{QueueName: "testing_queue"},
consumer.WithOnMessageError[Message](func(_ context.Context, _ *amqp091.Delivery, err error) {
_, _ = fmt.Fprintf(os.Stderr, "[ERROR] Message error: %s\n", err)
}),
consumer.WithConnectionOptions[Message](connection.Config{
Host: "127.0.0.1",
User: "guest",
Password: "guest",
ConnectionName: "go-amqp-consumer",
}),
)
if err != nil {
panic(err)
Expand Down Expand Up @@ -109,17 +99,11 @@ func ExampleConsumerWithSignal() {
signal.Notify(sig, os.Interrupt, syscall.SIGTERM)

c, err := consumer.NewFunc(handler,
connection.DefaultConfig,
consumer.QueueDeclare{QueueName: "testing_queue"},
consumer.WithOnMessageError[Message](func(_ context.Context, _ *amqp091.Delivery, err error) {
_, _ = fmt.Fprintf(os.Stderr, "[ERROR] Message error: %s\n", err)
}),
consumer.WithContext[Message](ctx),
consumer.WithConnectionOptions[Message](connection.Config{
Host: "127.0.0.1",
User: "guest",
Password: "guest",
ConnectionName: "go-amqp-consumer",
}),
)
if err != nil {
panic(err)
Expand Down Expand Up @@ -157,16 +141,11 @@ func (h MyRawHandler) Handle(_ context.Context, d *amqp091.Delivery) error {

func Example_ConsumerWithRawHandler() {
c, err := consumer.NewRaw(MyRawHandler{},
connection.DefaultConfig,
consumer.QueueDeclare{QueueName: "testing_queue"},
consumer.WithOnMessageError[Message](func(_ context.Context, _ *amqp091.Delivery, err error) {
_, _ = fmt.Fprintf(os.Stderr, "[ERROR] Message error: %s\n", err)
}),
consumer.WithConnectionOptions[Message](connection.Config{
Host: "127.0.0.1",
User: "guest",
Password: "guest",
ConnectionName: "go-amqp-consumer",
}),
)
if err != nil {
panic(err)
Expand Down
45 changes: 0 additions & 45 deletions consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ type QueueDeclare struct {
}

type Config[T any] struct {
ctx context.Context
serializer serializer.Serializer[T]
onError connection.OnErrorFunc
onMessageError func(context.Context, *amqp091.Delivery, error)
Expand Down Expand Up @@ -82,47 +81,3 @@ func WithOnErrorFunc[T any](onError connection.OnErrorFunc) Option[T] {
c.onError = onError
}
}

func WithContext[T any](ctx context.Context) Option[T] {
return func(c *Config[T]) {
c.ctx = ctx
}
}

func WithConnectionOptions[T any](connectionOptions connection.Config) Option[T] {
return func(c *Config[T]) {
if connectionOptions.Channels == 0 {
connectionOptions.Channels = connection.DefaultConfig.Channels
}

if connectionOptions.Vhost == "" {
connectionOptions.Vhost = connection.DefaultConfig.Vhost
}

if connectionOptions.Host == "" {
connectionOptions.Host = connection.DefaultConfig.Host
}

if connectionOptions.Port == 0 {
connectionOptions.Port = connection.DefaultConfig.Port
}

if connectionOptions.User == "" {
connectionOptions.User = connection.DefaultConfig.User
}

if connectionOptions.Password == "" {
connectionOptions.Password = connection.DefaultConfig.Password
}

if connectionOptions.ReconnectInterval == 0 {
connectionOptions.ReconnectInterval = connection.DefaultConfig.ReconnectInterval
}

if connectionOptions.ReconnectRetry == 0 {
connectionOptions.ReconnectRetry = connection.DefaultConfig.ReconnectRetry
}

c.connectionOptions = connectionOptions
}
}
44 changes: 31 additions & 13 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,34 +28,38 @@ type (
}
)

func NewRaw[T Message](handler RawHandler, queueDeclare QueueDeclare, options ...Option[T]) (Consumer[T], error) {
func NewRaw[T Message](
handler RawHandler,
connectionOptions connection.Config,
queueDeclare QueueDeclare,
options ...Option[T],
) (Consumer[T], error) {
var msg T

if reflect.ValueOf(msg).Kind() == reflect.Ptr {
return Consumer[T]{}, ErrMessageTypeInvalid
}

cfg := Config[T]{
cfg := &Config[T]{
queueConfig: QueueConfig{
PrefetchCount: 128,
Workers: 1,
},
retryCount: 1,
serializer: serializer.JSON[T]{},
ctx: context.Background(),
onError: func(err error) {
if errors.Is(err, connection.ErrRetriesExhausted) {
panic(err)
}
},
connectionOptions: connection.DefaultConfig,
connectionOptions: connectionOptions,
onMessageError: nil,
onListenerStart: nil,
onListenerExit: nil,
}

for _, o := range options {
o(&cfg)
o(cfg)
}

if queueDeclare.QueueName == "" {
Expand All @@ -68,17 +72,22 @@ func NewRaw[T Message](handler RawHandler, queueDeclare QueueDeclare, options ..

return Consumer[T]{
watcher: semaphore.NewWeighted(int64(cfg.queueConfig.Workers)),
cfg: &cfg,
cfg: cfg,
queueDeclare: &queueDeclare,
handler: handler,
}, nil
}

func NewRawFunc[T Message](h RawHandlerFunc, queueDeclare QueueDeclare, options ...Option[T]) (Consumer[T], error) {
return NewRaw(h, queueDeclare, options...)
func NewRawFunc[T Message](h RawHandlerFunc, connectionOptions connection.Config, queueDeclare QueueDeclare, options ...Option[T]) (Consumer[T], error) {
return NewRaw(h, connectionOptions, queueDeclare, options...)
}

func NewFunc[T Message](h HandlerFunc[T], queueDeclare QueueDeclare, options ...Option[T]) (Consumer[T], error) {
func NewFunc[T Message](
h HandlerFunc[T],
connectionOptions connection.Config,
queueDeclare QueueDeclare,
options ...Option[T],
) (Consumer[T], error) {
cfg := Config[T]{}

for _, o := range options {
Expand Down Expand Up @@ -110,10 +119,15 @@ func NewFunc[T Message](h HandlerFunc[T], queueDeclare QueueDeclare, options ...
rawHandler = privHandler
}

return NewRaw(rawHandler, queueDeclare, options...)
return NewRaw(rawHandler, connectionOptions, queueDeclare, options...)
}

func New[T Message](h Handler[T], queueDeclare QueueDeclare, options ...Option[T]) (Consumer[T], error) {
func New[T Message](
h Handler[T],
connectionOptions connection.Config,
queueDeclare QueueDeclare,
options ...Option[T],
) (Consumer[T], error) {
cfg := Config[T]{}

for _, o := range options {
Expand Down Expand Up @@ -145,9 +159,13 @@ func New[T Message](h Handler[T], queueDeclare QueueDeclare, options ...Option[T
rawHandler = privHandler
}

return NewRaw(rawHandler, queueDeclare, options...)
return NewRaw(rawHandler, connectionOptions, queueDeclare, options...)
}

func (c Consumer[T]) CloseWithContext(ctx context.Context) error {
return c.watcher.Acquire(ctx, int64(c.cfg.queueConfig.Workers))
}

func (c Consumer[T]) Close() error {
return c.watcher.Acquire(context.Background(), int64(c.cfg.queueConfig.Workers))
return c.CloseWithContext(context.Background())
}
19 changes: 9 additions & 10 deletions examples/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,7 @@ func main() {

c, err := consumer.NewFunc(
handler,
consumer.QueueDeclare{
ExchangeBindings: []consumer.ExchangeBinding{{ExchangeName: "testing_publisher"}},
QueueName: "testing_queue",
Durable: true,
},
consumer.WithOnMessageError[Message](func(ctx context.Context, d *amqp091.Delivery, err error) {
_, _ = fmt.Fprintf(os.Stderr, "[ERROR] Message error: %s\n", err)
}),
consumer.WithContext[Message](ctx),
consumer.WithConnectionOptions[Message](connection.Config{
connection.Config{
Host: "127.0.0.1",
Port: 5672,
User: "guest",
Expand All @@ -56,6 +47,14 @@ func main() {
ReconnectInterval: 1 * time.Second,
Channels: 1000,
FrameSize: 8192,
},
consumer.QueueDeclare{
ExchangeBindings: []consumer.ExchangeBinding{{ExchangeName: "testing_publisher"}},
QueueName: "testing_queue",
Durable: true,
},
consumer.WithOnMessageError[Message](func(ctx context.Context, d *amqp091.Delivery, err error) {
_, _ = fmt.Fprintf(os.Stderr, "[ERROR] Message error: %s\n", err)
}),
consumer.WithQueueConfig[Message](consumer.QueueConfig{
Workers: 1,
Expand Down

0 comments on commit c297e4d

Please sign in to comment.