Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove handler groups from _examples/basic/5-cqrs-protobuf and polish it a bit #525

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion _examples/basic/5-cqrs-protobuf/.validate_example.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
validation_cmd: "docker compose up"
teardown_cmd: "docker compose down"
timeout: 120
expected_output: "beers ordered to room 3"
expected_outputs:
- "beers ordered to room \\d+"
- "Already booked rooms for \\$\\d{2,}"
29 changes: 29 additions & 0 deletions _examples/basic/5-cqrs-protobuf/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,35 @@ Detailed documentation for CQRS can be found in Watermill's docs: [http://waterm

![CQRS Event Storming](https://threedots.tech/watermill-io/cqrs-example-storming.png)

```mermaid
sequenceDiagram
participant M as Main
participant CB as CommandBus
participant BRH as BookRoomHandler
participant EB as EventBus
participant OBRB as OrderBeerOnRoomBooked
participant OBH as OrderBeerHandler
participant BFR as BookingsFinancialReport

Note over M,BFR: Commands use AMQP queue, Events use AMQP pub/sub

M->>CB: Send(BookRoom)<br/>topic: commands.BookRoom
CB->>BRH: Handle(BookRoom)

BRH->>EB: Publish(RoomBooked)<br/>topic: events.RoomBooked

par Process RoomBooked Event
EB->>OBRB: Handle(RoomBooked)
OBRB->>CB: Send(OrderBeer)<br/>topic: commands.OrderBeer
CB->>OBH: Handle(OrderBeer)
OBH->>EB: Publish(BeerOrdered)<br/>topic: events.BeerOrdered

EB->>BFR: Handle(RoomBooked)
Note over BFR: Updates financial report
end
```


## Running

```bash
Expand Down
1 change: 1 addition & 0 deletions _examples/basic/5-cqrs-protobuf/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ services:
rabbitmq:
image: rabbitmq:3.7
restart: unless-stopped
attach: false
2 changes: 1 addition & 1 deletion _examples/basic/5-cqrs-protobuf/go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module main.go

require (
github.com/ThreeDotsLabs/watermill v1.3.7
github.com/ThreeDotsLabs/watermill v1.4.2
github.com/ThreeDotsLabs/watermill-amqp/v3 v3.0.0
github.com/golang/protobuf v1.5.4
)
Expand Down
6 changes: 6 additions & 0 deletions _examples/basic/5-cqrs-protobuf/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
github.com/ThreeDotsLabs/watermill v1.3.7 h1:NV0PSTmuACVEOV4dMxRnmGXrmbz8U83LENOvpHekN7o=
github.com/ThreeDotsLabs/watermill v1.3.7/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to=
github.com/ThreeDotsLabs/watermill v1.4.1 h1:gjP6yZH+otMPjV0KsV07pl9TeMm9UQV/gqiuiuG5Drs=
github.com/ThreeDotsLabs/watermill v1.4.1/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to=
github.com/ThreeDotsLabs/watermill v1.4.2-0.20241216112750-9d5e2da13339 h1:Q6joJUSSelwcxHEngdu3PGy4UYqk7BMXKC3Rzer3Xuk=
github.com/ThreeDotsLabs/watermill v1.4.2-0.20241216112750-9d5e2da13339/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to=
github.com/ThreeDotsLabs/watermill v1.4.2 h1:lX/J79HyUipxZ2VetC7vMPqlw29xreHMxzhPlcZnYoQ=
github.com/ThreeDotsLabs/watermill v1.4.2/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to=
github.com/ThreeDotsLabs/watermill-amqp/v3 v3.0.0 h1:r5idq2qkd3M345iv3C3zAX+lFlEu7iW8QESNnuuv4eY=
github.com/ThreeDotsLabs/watermill-amqp/v3 v3.0.0/go.mod h1:+8tCh6VCuBcQWhfETCwzRINKQ1uyeg9moH3h7jMKxQk=
github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
Expand Down
113 changes: 60 additions & 53 deletions _examples/basic/5-cqrs-protobuf/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@ package main
import (
"context"
"fmt"
"log"
"log/slog"
"math/rand"
"sync"
"time"

"github.com/golang/protobuf/ptypes"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v3/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/components/cqrs"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"google.golang.org/protobuf/types/known/timestamppb"
)

// BookRoomHandler is a command handler, which handles BookRoom command and emits RoomBooked.
Expand All @@ -29,12 +28,12 @@ func (b BookRoomHandler) Handle(ctx context.Context, cmd *BookRoom) error {
// some random price, in production you probably will calculate in wiser way
price := (rand.Int63n(40) + 1) * 10

log.Printf(
"Booked %s for %s from %s to %s",
cmd.RoomId,
cmd.GuestName,
time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
slog.Info(
"Booked room",
"room_id", cmd.RoomId,
"guest_name", cmd.GuestName,
"start_date", time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
"end_date", time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
)

// RoomBooked will be handled by OrderBeerOnRoomBooked event handler,
Expand Down Expand Up @@ -73,10 +72,6 @@ type OrderBeerHandler struct {
eventBus *cqrs.EventBus
}

func (o OrderBeerHandler) HandlerName() string {
return "OrderBeerHandler"
}

func (o OrderBeerHandler) Handle(ctx context.Context, cmd *OrderBeer) error {
if rand.Int63n(10) == 0 {
// sometimes there is no beer left, command will be retried
Expand All @@ -90,7 +85,7 @@ func (o OrderBeerHandler) Handle(ctx context.Context, cmd *OrderBeer) error {
return err
}

log.Printf("%d beers ordered to room %s", cmd.Count, cmd.RoomId)
slog.Info(fmt.Sprintf("%d beers ordered to room %s", cmd.Count, cmd.RoomId))
return nil
}

Expand Down Expand Up @@ -123,19 +118,36 @@ func (b *BookingsFinancialReport) Handle(ctx context.Context, event *RoomBooked)

b.totalCharge += event.Price

fmt.Printf(">>> Already booked rooms for $%d\n", b.totalCharge)
slog.Info(fmt.Sprintf(">>> Already booked rooms for $%d\n", b.totalCharge))
return nil
}

var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"

func main() {
logger := watermill.NewStdLogger(false, false)
cqrsMarshaler := cqrs.ProtobufMarshaler{}
logger := watermill.NewSlogLoggerWithLevelMapping(nil, map[slog.Level]slog.Level{
slog.LevelInfo: slog.LevelDebug,
})

cqrsMarshaler := cqrs.ProtobufMarshaler{
// It will generate topic names based on the event/command type.
// So for example, for "RoomBooked" name will be "RoomBooked".
//
// This value is used to generate topic names with "generateEventsTopic" and "generateCommandsTopic" functions.
GenerateName: cqrs.StructName,
}

generateEventsTopic := func(eventName string) string {
return "events." + eventName
}

generateCommandsTopic := func(commandName string) string {
return "commands." + commandName
}

// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
// Commands will be send to queue, because they need to be consumed once.
// Commands will be sent to queue, because they need to be consumed once.
commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
if err != nil {
Expand Down Expand Up @@ -168,8 +180,7 @@ func main() {

commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
return generateCommandsTopic(params.CommandName), nil
},
OnSend: func(params cqrs.CommandBusOnSendParams) error {
logger.Info("Sending command", watermill.LogFields{
Expand All @@ -191,8 +202,7 @@ func main() {
router,
cqrs.CommandProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
return generateCommandsTopic(params.CommandName), nil
},
SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
// we can reuse subscriber, because all commands have separated topics
Expand Down Expand Up @@ -221,11 +231,7 @@ func main() {

eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
// because we are using PubSub RabbitMQ config, we can use one topic for all events
return "events", nil

// we can also use topic per event type
// return params.EventName, nil
return generateEventsTopic(params.EventName), nil
},

OnPublish: func(params cqrs.OnEventSendParams) error {
Expand All @@ -245,22 +251,22 @@ func main() {
panic(err)
}

eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
eventProcessor, err := cqrs.NewEventProcessorWithConfig(
router,
cqrs.EventGroupProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
return "events", nil
cqrs.EventProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) {
return generateEventsTopic(params.EventName), nil
},
SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) {
config := amqp.NewDurablePubSubConfig(
amqpAddress,
amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
amqp.GenerateQueueNameTopicNameWithSuffix(params.HandlerName),
)

return amqp.NewSubscriber(config, logger)
},

OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
OnHandle: func(params cqrs.EventProcessorOnHandleParams) error {
start := time.Now()

err := params.Handler.Handle(params.Message.Context(), params.Event)
Expand Down Expand Up @@ -290,16 +296,24 @@ func main() {
panic(err)
}

err = eventProcessor.AddHandlersGroup(
"events",
cqrs.NewGroupEventHandler(OrderBeerOnRoomBooked{commandBus}.Handle),
cqrs.NewGroupEventHandler(NewBookingsFinancialReport().Handle),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
err = eventProcessor.AddHandlers(
cqrs.NewEventHandler(
"OrderBeerOnRoomBooked",
OrderBeerOnRoomBooked{commandBus}.Handle,
),
cqrs.NewEventHandler(
"LogBeerOrdered",
func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
},
),
cqrs.NewEventHandler(
"BookingsFinancialReport",
NewBookingsFinancialReport().Handle,
),
)
if err != nil {
panic(err)
Expand All @@ -319,15 +333,8 @@ func publishCommands(commandBus *cqrs.CommandBus) func() {
for {
i++

startDate, err := ptypes.TimestampProto(time.Now())
if err != nil {
panic(err)
}

endDate, err := ptypes.TimestampProto(time.Now().Add(time.Hour * 24 * 3))
if err != nil {
panic(err)
}
startDate := timestamppb.New(time.Now())
endDate := timestamppb.New(time.Now().Add(time.Hour * 24 * 3))

bookRoomCmd := &BookRoom{
RoomId: fmt.Sprintf("%d", i),
Expand Down
4 changes: 4 additions & 0 deletions _examples/basic/5-cqrs-protobuf/makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.PHONY: proto

proto:
protoc --proto_path=proto --go_out=. --go_opt=paths=source_relative messages.proto
10 changes: 10 additions & 0 deletions _examples/basic/6-cqrs-ordered-events/.validate_example.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
validation_cmd: "docker compose up"
teardown_cmd: "docker compose down"
timeout: 120
expected_outputs:
- "Subscriber added subscriber_id"
- "Subscriber updated subscriber_id"
- "Subscriber removed subscriber_id"
- "\\[ACTIVITY\\] activity_type=SUBSCRIBED"
- "\\[ACTIVITY\\] activity_type=UNSUBSCRIBED"
- "\\[ACTIVITY\\] activity_type=EMAIL_UPDATED"
38 changes: 38 additions & 0 deletions _examples/basic/6-cqrs-ordered-events/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Example Golang CQRS application - ordered events with Kafka

This application is using [Watermill CQRS](http://watermill.io/docs/cqrs) component.

Detailed documentation for CQRS can be found in Watermill's docs: [http://watermill.io/docs/cqrs#usage](http://watermill.io/docs/cqrs).

This example, uses event groups to keep order for multiple events. You can read more about them in the [Watermill documentation](https://watermill.io/docs/cqrs/).
We are also using Kafka's partitioning keys to increase processing throughput without losing order of events.


## What does this application do?

This application manages an email subscription system where users can:

1. Subscribe to receive emails by providing their email address
2. Update their email address after subscribing
3. Unsubscribe from the mailing list

The system maintains:
- A current list of all active subscribers
- A timeline of all subscription-related activities

In this example, keeping order of events is crucial.
If events won't be ordered, and `SubscriberSubscribed` would arrive after `SubscriberUnsubscribed` event,
the subscriber will be still subscribed.

## Possible improvements

In this example, we are using global `events` and `commands` topics.
You can consider splitting them into smaller topics, for example, per aggregate type.

Thanks to that, you can scale your application horizontally and increase the throughput and processing less events.

## Running

```bash
docker-compose up
```
Loading
Loading