Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CQRS + NATS example #530

Closed
desprit opened this issue Jan 3, 2025 · 1 comment
Closed

CQRS + NATS example #530

desprit opened this issue Jan 3, 2025 · 1 comment

Comments

@desprit
Copy link

desprit commented Jan 3, 2025

I'm trying to following CQRS docs, also this PR helped me a lot:
#525

import (
        ...
        watermill_nats "github.com/ThreeDotsLabs/watermill-nats/v2/pkg/nats"
        watermill_cqrs "github.com/ThreeDotsLabs/watermill/components/cqrs"
)

type NatsMarshaler struct{}

func (m *NatsMarshaler) Marshal(topic string, msg *message.Message) (*nats.Msg, error) {
	return &nats.Msg{
		Subject: topic,
		Data:    msg.Payload,
	}, nil
}

func (m *NatsMarshaler) Unmarshal(natsMsg *nats.Msg) (*message.Message, error) {
	return &message.Message{
		Payload: natsMsg.Data,
	}, nil
}

func (c *cqrs) Start(ctx context.Context) error {
	logger := watermill.NewStdLogger(false, false)
	jsConfig := watermill_nats.JetStreamConfig{Disabled: true}
	cqrsMarshaler := watermill_cqrs.ProtobufMarshaler{GenerateName: watermill_cqrs.StructName}
	natsMarshaller := &NatsMarshaler{}

	// Publisher

	publisher, err := watermill_nats.NewPublisherWithNatsConn(c.nc, watermill_nats.PublisherPublishConfig{
		Marshaler: natsMarshaller,
		JetStream: jsConfig,
	}, logger)
	if err != nil {
		return fmt.Errorf("failed to create commands publisher: %w", err)
	}

	// Router

	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		return fmt.Errorf("failed to create router: %w", err)
	}
	router.AddMiddleware(middleware.Recoverer)
	router.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(msg *message.Message) ([]*message.Message, error) {
			fmt.Printf("Received message metadata %+v", msg.Metadata)
			return h(msg)
		}
	})

	// BUS

	commandBus, err := watermill_cqrs.NewCommandBusWithConfig(publisher, watermill_cqrs.CommandBusConfig{
		GeneratePublishTopic: func(params watermill_cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
			println("commandBus.GeneratePublishTopic")
			return generateCommandsTopic(params.CommandName), nil
		},
		OnSend: func(params watermill_cqrs.CommandBusOnSendParams) error {
			println("commandBus.OnSend")
			fmt.Printf("Params %+v", params)
			return nil
		},
		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		return fmt.Errorf("failed to create command bus: %w", err)
	}
	c.commandBus = commandBus

	eventBus, err := watermill_cqrs.NewEventBusWithConfig(publisher, watermill_cqrs.EventBusConfig{
		GeneratePublishTopic: func(params watermill_cqrs.GenerateEventPublishTopicParams) (string, error) {
			println("eventBus.GeneratePublishTopic")
			return generateEventsTopic(params.EventName), nil
		},
		OnPublish: func(params watermill_cqrs.OnEventSendParams) error {
			println("eventBus.OnPublish")
			return nil
		},
		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		return fmt.Errorf("failed to create event bus: %w", err)
	}
	c.eventBus = eventBus

	// Processor

	commandProcessor, err := watermill_cqrs.NewCommandProcessorWithConfig(
		router,
		watermill_cqrs.CommandProcessorConfig{
			GenerateSubscribeTopic: func(params watermill_cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
				println("commandProcessor.GenerateSubscribeTopic")
				return generateCommandsTopic(params.CommandName), nil
			},
			SubscriberConstructor: func(params watermill_cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				println("commandProcessor.SubscriberConstructor")
				return watermill_nats.NewSubscriberWithNatsConn(c.nc, watermill_nats.SubscriberSubscriptionConfig{
					CloseTimeout:   30 * time.Second,
					AckWaitTimeout: 30 * time.Second,
					JetStream:      jsConfig,
					Unmarshaler:    natsMarshaller,
				}, logger)
			},
			OnHandle: func(params watermill_cqrs.CommandProcessorOnHandleParams) error {
				println("commandProcessor.OnHandle")
				return params.Handler.Handle(params.Message.Context(), params.Command)
			},
			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		return fmt.Errorf("failed to create command processor: %w", err)
	}

	eventProcessor, err := watermill_cqrs.NewEventProcessorWithConfig(
		router,
		watermill_cqrs.EventProcessorConfig{
			GenerateSubscribeTopic: func(params watermill_cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) {
				println("eventProcessor.GenerateSubscribeTopic")
				return generateEventsTopic(params.EventName), nil
			},
			SubscriberConstructor: func(params watermill_cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				println("eventProcessor.SubscriberConstructor")
				return watermill_nats.NewSubscriberWithNatsConn(c.nc, watermill_nats.SubscriberSubscriptionConfig{
					CloseTimeout:   30 * time.Second,
					AckWaitTimeout: 30 * time.Second,
					JetStream:      jsConfig,
					Unmarshaler:    natsMarshaller,
				}, logger)
			},
			OnHandle: func(params watermill_cqrs.EventProcessorOnHandleParams) error {
				println("eventProcessor.OnHandle")
				return params.Handler.Handle(params.Message.Context(), params.Event)
			},
			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		return fmt.Errorf("failed to create event processor: %w", err)
	}

	// Handler

	err = commandProcessor.AddHandlers(
		watermill_cqrs.NewCommandHandler("CreateNlpRequestHandler", cmd.CreateNlpRequestHandler{EventBus: eventBus}.Handle),
	)
	if err != nil {
		return fmt.Errorf("failed to add command handlers: %w", err)
	}

	err = eventProcessor.AddHandlers(
		watermill_cqrs.NewEventHandler("NlpRequestReceived", evt.NlpRequestReceived{CommandBus: commandBus}.Handle),
		watermill_cqrs.NewEventHandler("NlpRequestCreated", evt.NlpRequestCreated{CommandBus: commandBus}.Handle),
	)
	if err != nil {
		return fmt.Errorf("failed to add event handlers: %w", err)
	}

	if err := router.Run(ctx); err != nil {
		return fmt.Errorf("failed to start router: %w", err)
	}

	return nil
}

func generateEventsTopic(eventName string) string {
	return "events." + eventName
}

func generateCommandsTopic(commandName string) string {
	return "commands." + commandName
}

Command handler is dead simple:

// CreateNlpRequestHandler is a command handler, which handles CreateNlpRequest command and emits NlpRequestCreated.
type CreateNlpRequestHandler struct {
	EventBus *watermill_cqrs.EventBus
}

func (b CreateNlpRequestHandler) Handle(ctx context.Context, cmd *go_proto.CreateNlpRequest) error {
	println("CreateNlpRequestHandler.Handle")
	if err := b.EventBus.Publish(ctx, &go_proto.NlpRequestCreated{}); err != nil {
		return err
	}

	return nil
}

Logs:

commandProcessor.GenerateSubscribeTopic
commandProcessor.SubscriberConstructor
[watermill] 2025/01/03 18:32:02.271685 router.go:280: 	level=INFO  msg="Adding handler" handler_name=CreateNlpRequestHandler topic=commands.CreateNlpRequest
eventProcessor.GenerateSubscribeTopic
eventProcessor.SubscriberConstructor
[watermill] 2025/01/03 18:32:02.271736 router.go:280: 	level=INFO  msg="Adding handler" handler_name=NlpRequestReceived topic=events.NlpRequestReceived
eventProcessor.GenerateSubscribeTopic
eventProcessor.SubscriberConstructor
[watermill] 2025/01/03 18:32:02.271744 router.go:280: 	level=INFO  msg="Adding handler" handler_name=NlpRequestCreated topic=events.NlpRequestCreated
[watermill] 2025/01/03 18:32:02.271748 router.go:414: 	level=INFO  msg="Running router handlers" count=3
[watermill] 2025/01/03 18:32:02.271806 router.go:618: 	level=INFO  msg="Starting handler" subscriber_name=NlpRequestReceived topic=events.NlpRequestReceived
[watermill] 2025/01/03 18:32:02.271816 router.go:618: 	level=INFO  msg="Starting handler" subscriber_name=NlpRequestCreated topic=events.NlpRequestCreated
[watermill] 2025/01/03 18:32:02.271851 router.go:618: 	level=INFO  msg="Starting handler" subscriber_name=CreateNlpRequestHandler topic=commands.CreateNlpRequest
commandBus.GeneratePublishTopic
commandBus.OnSend
Params {CommandName:CreateNlpRequest Command:traceparent:"123" Message:0x140000a4660}Received message metadata map[][watermill] 2025/01/03 18:32:09.274532 router.go:461: 	level=INFO  msg="Subscriber stopped" subscriber_name=NlpRequestCreated topic=events.NlpRequestCreated
[watermill] 2025/01/03 18:32:09.274699 router.go:461: 	level=INFO  msg="Subscriber stopped" subscriber_name=CreateNlpRequestHandler topic=commands.CreateNlpRequest
[watermill] 2025/01/03 18:32:09.274760 router.go:461: 	level=INFO  msg="Subscriber stopped" subscriber_name=NlpRequestReceived topic=events.NlpRequestReceived
[watermill] 2025/01/03 18:32:09.274781 router.go:550: 	level=INFO  msg="Closing router"
[watermill] 2025/01/03 18:32:09.274824 router.go:393: 	level=INFO  msg="Waiting for messages" timeout=30s
[watermill] 2025/01/03 18:32:09.274840 router.go:561: 	level=INFO  msg="Router closed"
[watermill] 2025/01/03 18:32:09.274851 router.go:399: 	level=INFO  msg="All messages processed"

The problem is I can't get my Command Handler executed. commandBus.OnSend runs and then nothing. I might have configured topics incorrectly but I tried so many permutations and nothing worked. I'm currently trying to run it in a non-jetstream version.

Perhaps anyone succeeded in cqrs + nats integration already?

@desprit
Copy link
Author

desprit commented Jan 8, 2025

I made it work eventually by starting from CQRS docs example and slowly replacing piece by piece until it clicked. In my example above it is important to replace cqrsMarshaller with the following one:
https://github.com/ThreeDotsLabs/watermill/blob/7ab13543158c074f24233732da36c8315f1ab9fc/components/cqrs/marshaler_protobuf.go which uses "google.golang.org/protobuf/proto" instead of "github.com/gogo/protobuf/proto".

In the commandProcessor and eventProcessor I ended up with the following SubscriberConstructor:

watermill_nats.NewSubscriberWithNatsConn(c.nc, watermill_nats.SubscriberSubscriptionConfig{
        JetStream: jsConfig,
}, logger)

My Watermill+NATS+Protobuf example doesn't use JetStream by the way. I don't know much about NATS so went with the most straightforward approach for now.

@desprit desprit closed this as completed Jan 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant