Skip to content

Commit

Permalink
Add Test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
yash.bansal committed Sep 9, 2024
1 parent d4a3d16 commit 70d2e59
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 6 deletions.
2 changes: 1 addition & 1 deletion message/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Subscriber interface {
// If message processing fails and the message should be redelivered `Nack()` should be called instead.
//
// When the provided ctx is canceled, the subscriber closes the subscription and the output channel.
// The provided ctx is passed to all produced messages.
// The provided ctx is passed to all produced messages (this is configurable for the local Pub/Sub implementation).
// When Nack or Ack is called on the message, the context of the message is canceled.
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
// Close closes all subscriptions with their output channels and flushes offsets etc. when needed.
Expand Down
53 changes: 53 additions & 0 deletions pubsub/gochannel/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ func createPersistentPubSub(t *testing.T) (message.Publisher, message.Subscriber
return pubSub, pubSub
}

func createPersistentPubSubWithContextPreserved(t *testing.T) (message.Publisher, message.Subscriber) {
pubSub := gochannel.NewGoChannel(
gochannel.Config{
OutputChannelBuffer: 10000,
Persistent: true,
PreserveContext: true,
},
watermill.NewStdLogger(true, true),
)
return pubSub, pubSub
}

func TestPublishSubscribe_persistent(t *testing.T) {
tests.TestPubSub(
t,
Expand All @@ -44,6 +56,22 @@ func TestPublishSubscribe_persistent(t *testing.T) {
)
}

func TestPublishSubscribe_context_preserved(t *testing.T) {
tests.TestPubSub(
t,
tests.Features{
ConsumerGroups: false,
ExactlyOnceDelivery: true,
GuaranteedOrder: false,
Persistent: false,
RequireSingleInstance: true,
ContextPreserved: true,
},
createPersistentPubSubWithContextPreserved,
nil,
)
}

func TestPublishSubscribe_not_persistent(t *testing.T) {
messagesCount := 100
pubSub := gochannel.NewGoChannel(
Expand All @@ -63,6 +91,31 @@ func TestPublishSubscribe_not_persistent(t *testing.T) {
assert.NoError(t, pubSub.Close())
}

func TestPublishSubscribe_not_persistent_with_context(t *testing.T) {
messagesCount := 100
pubSub := gochannel.NewGoChannel(
gochannel.Config{OutputChannelBuffer: int64(messagesCount), PreserveContext: true},
watermill.NewStdLogger(true, true),
)
topicName := "test_topic_" + watermill.NewUUID()

msgs, err := pubSub.Subscribe(context.Background(), topicName)
require.NoError(t, err)

const contextKey = "foo"
sendMessages := tests.PublishSimpleMessagesWithContext(t, messagesCount, contextKey, pubSub, topicName)
receivedMsgs, _ := subscriber.BulkRead(msgs, messagesCount, time.Second)

expectedContexts := make(map[string]context.Context)
for _, msg := range sendMessages {
expectedContexts[msg.UUID] = msg.Context()
}
tests.AssertAllMessagesReceived(t, sendMessages, receivedMsgs)
tests.AssertAllMessagesHaveSameContext(t, contextKey, expectedContexts, receivedMsgs)

assert.NoError(t, pubSub.Close())
}

func TestPublishSubscribe_block_until_ack(t *testing.T) {
pubSub := gochannel.NewGoChannel(
gochannel.Config{BlockPublishUntilSubscriberAck: true},
Expand Down
17 changes: 17 additions & 0 deletions pubsub/tests/test_asserts.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tests

import (
"context"
"sort"
"testing"

Expand Down Expand Up @@ -92,3 +93,19 @@ func AssertMessagesMetadata(t *testing.T, key string, expectedValues map[string]

return ok
}

// AssertAllMessagesHaveSameContext checks if context of all received messages is the same as in expectedValues, if PreserveContext is enabled.
func AssertAllMessagesHaveSameContext(t *testing.T, contextKey string, expectedValues map[string]context.Context, received []*message.Message) bool {
assert.Len(t, received, len(expectedValues))

ok := true
for _, msg := range received {
expectedValue := expectedValues[msg.UUID].Value(contextKey)
actualValue := msg.Context().Value(contextKey)
if !assert.Equal(t, expectedValue, actualValue) {
ok = false
}
}

return ok
}
47 changes: 42 additions & 5 deletions pubsub/tests/test_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ type Features struct {

// GenerateTopicFunc overrides standard topic name generation.
GenerateTopicFunc func(tctx TestContext) string

// ContextPreserved should be set to true if the Pub/Sub implementation preserves the context
// of the message when it's published and consumed.
ContextPreserved bool
}

// RunOnlyFastTests returns true if -short flag was provided -race was not provided.
Expand Down Expand Up @@ -993,7 +997,14 @@ func TestSubscribeCtx(
if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
}
publishedMessages := PublishSimpleMessages(t, messagesCount, pub, topicName)

var publishedMessages message.Messages
var contextKeyString = "abc"
if tCtx.Features.ContextPreserved {
publishedMessages = PublishSimpleMessagesWithContext(t, messagesCount, contextKeyString, pub, topicName)
} else {
publishedMessages = PublishSimpleMessages(t, messagesCount, pub, topicName)
}

msgsToCancel, err := sub.Subscribe(ctxWithCancel, topicName)
require.NoError(t, err)
Expand All @@ -1017,15 +1028,23 @@ ClosedLoop:
}

ctx := context.WithValue(context.Background(), contextKey("foo"), "bar")

// For mocking the output of pub-subs where context is preserved vs not preserved
expectedContexts := make(map[string]context.Context)
for _, msg := range publishedMessages {
if tCtx.Features.ContextPreserved {
expectedContexts[msg.UUID] = msg.Context()
} else {
expectedContexts[msg.UUID] = ctx
}
}

msgs, err := sub.Subscribe(ctx, topicName)
require.NoError(t, err)

receivedMessages, _ := bulkRead(tCtx, msgs, messagesCount, defaultTimeout)
AssertAllMessagesReceived(t, publishedMessages, receivedMessages)

for _, msg := range receivedMessages {
assert.EqualValues(t, "bar", msg.Context().Value(contextKey("foo")))
}
AssertAllMessagesHaveSameContext(t, contextKeyString, expectedContexts, receivedMessages)
}

// TestReconnect tests if reconnecting to a Pub/Sub works correctly.
Expand Down Expand Up @@ -1271,6 +1290,24 @@ func PublishSimpleMessages(t *testing.T, messagesCount int, publisher message.Pu
return messagesToPublish
}

// PublishSimpleMessagesWithContext publishes provided number of simple messages without a payload, but custom context
func PublishSimpleMessagesWithContext(t *testing.T, messagesCount int, contextKey string, publisher message.Publisher, topicName string) message.Messages {
var messagesToPublish []*message.Message

for i := 0; i < messagesCount; i++ {
id := watermill.NewUUID()

msg := message.NewMessage(id, nil)
msg.SetContext(context.WithValue(context.Background(), contextKey, "bar"+strconv.Itoa(i)))
messagesToPublish = append(messagesToPublish, msg)

err := publishWithRetry(publisher, topicName, msg)
require.NoError(t, err, "cannot publish messages")
}

return messagesToPublish
}

// AddSimpleMessagesParallel publishes provided number of simple messages without a payload
// using the provided number of publishers (goroutines).
func AddSimpleMessagesParallel(t *testing.T, messagesCount int, publisher message.Publisher, topicName string, publishers int) message.Messages {
Expand Down

0 comments on commit 70d2e59

Please sign in to comment.