Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Azure Service Bus library update #315

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions backends/azure-servicebus/azure-servicebus.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package azure_servicebus
import (
"context"

servicebus "github.com/Azure/azure-service-bus-go"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

Expand All @@ -21,10 +21,12 @@ var (
ErrQueueAndTopic = errors.New("only one topic or queue can be specified")
)

type messageHandlerFunc func(ctx context.Context, receiver *azservicebus.Receiver, msg *azservicebus.ReceivedMessage) error

type AzureServiceBus struct {
connOpts *opts.ConnectionOptions
connArgs *args.AzureServiceBusConn
client *servicebus.Namespace
client *azservicebus.Client
log *logrus.Entry
}

Expand All @@ -33,7 +35,7 @@ func New(connOpts *opts.ConnectionOptions) (*AzureServiceBus, error) {
return nil, errors.Wrap(err, "invalid connection options")
}

client, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connOpts.GetAzureServiceBus().ConnectionString))
client, err := azservicebus.NewClientFromConnectionString(connOpts.GetAzureServiceBus().ConnectionString, nil)
if err != nil {
return nil, errors.Wrap(err, "unable to create new azure service bus client")
}
Expand Down
3 changes: 2 additions & 1 deletion backends/azure-servicebus/display.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"fmt"
"time"

"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
"github.com/pkg/errors"

"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
"github.com/batchcorp/plumber-schemas/build/go/protos/records"

"github.com/batchcorp/plumber/printer"
)

Expand Down
103 changes: 58 additions & 45 deletions backends/azure-servicebus/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"encoding/json"
"time"

servicebus "github.com/Azure/azure-service-bus-go"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"

Expand All @@ -15,7 +15,7 @@ import (
"github.com/batchcorp/plumber/validate"
)

func (a *AzureServiceBus) Read(ctx context.Context, readOpts *opts.ReadOptions, resultsChan chan *records.ReadRecord, errorChan chan *records.ErrorRecord) error {
func (a *AzureServiceBus) Read(ctx context.Context, readOpts *opts.ReadOptions, resultsChan chan *records.ReadRecord, _ chan *records.ErrorRecord) error {
if err := validateReadOptions(readOpts); err != nil {
return errors.Wrap(err, "invalid read options")
}
Expand All @@ -24,7 +24,7 @@ func (a *AzureServiceBus) Read(ctx context.Context, readOpts *opts.ReadOptions,

var count int64

var handler servicebus.HandlerFunc = func(ctx context.Context, msg *servicebus.Message) error {
var handler messageHandlerFunc = func(ctx context.Context, receiver *azservicebus.Receiver, msg *azservicebus.ReceivedMessage) error {
count++

serializedMsg, err := json.Marshal(msg)
Expand All @@ -36,75 +36,75 @@ func (a *AzureServiceBus) Read(ctx context.Context, readOpts *opts.ReadOptions,
MessageId: uuid.NewV4().String(),
Num: count,
ReceivedAtUnixTsUtc: time.Now().UTC().Unix(),
Payload: msg.Data,
Payload: msg.Body,
XRaw: serializedMsg,
Record: &records.ReadRecord_AzureServiceBus{
AzureServiceBus: &records.AzureServiceBus{
ContentType: msg.ContentType,
CorrelationId: msg.CorrelationID,
Value: msg.Data,
DeliveryCount: msg.DeliveryCount,
SessionId: util.DerefString(msg.SessionID),
GroupSequence: util.DerefUint32(msg.GroupSequence),
Id: msg.ID,
Label: msg.Label,
ReplyTo: msg.ReplyTo,
ReplyToGroupId: msg.ReplyToGroupID,
To: msg.To,
Ttl: int64(msg.TTL.Seconds()),
LockToken: msg.LockToken.String(),
SystemProperties: makeSystemProperties(msg.SystemProperties),
UserProperties: util.MapInterfaceToString(msg.UserProperties),
Format: msg.Format,
ContentType: util.DerefString(msg.ContentType),
CorrelationId: util.DerefString(msg.CorrelationID),
Value: msg.Body,
DeliveryCount: msg.DeliveryCount,
SessionId: util.DerefString(msg.SessionID),
//GroupSequence: util.DerefUint32(msg.GroupSequence),
Id: msg.MessageID,
//Label: msg.Label,
ReplyTo: util.DerefString(msg.ReplyTo),
//ReplyToGroupId: msg.ReplyToGroupID,
To: util.DerefString(msg.To),
Ttl: int64(msg.TimeToLive.Seconds()),
LockToken: string(msg.LockToken[:]),
SystemProperties: makeSystemProperties(msg),
UserProperties: util.MapInterfaceToString(msg.ApplicationProperties),
//Format: msg.Format,
},
},
}

return msg.Complete(ctx)
return receiver.CompleteMessage(ctx, msg, nil)
}

if readOpts.AzureServiceBus.Args.Queue != "" {
return a.readQueue(ctx, handler, readOpts)
}

if readOpts.AzureServiceBus.Args.Topic != "" {
return a.readTopic(ctx, handler, readOpts)
}

return nil
return a.readTopic(ctx, handler, readOpts)
}

func makeSystemProperties(p *servicebus.SystemProperties) *records.AzureSystemProperties {
func makeSystemProperties(p *azservicebus.ReceivedMessage) *records.AzureSystemProperties {
if p == nil {
return &records.AzureSystemProperties{}
}

return &records.AzureSystemProperties{
LockedUntil: p.LockedUntil.Unix(),
SequenceNumber: util.DerefInt64(p.SequenceNumber),
PartitionId: int32(util.DerefInt16(p.PartitionID)),
LockedUntil: p.LockedUntil.Unix(),
SequenceNumber: util.DerefInt64(p.SequenceNumber),
//PartitionId: int32(util.DerefInt16(p.PartitionID)),
PartitionKey: util.DerefString(p.PartitionKey),
EnqueuedTime: util.DerefTime(p.EnqueuedTime),
DeadLetterSource: util.DerefString(p.DeadLetterSource),
ScheduledEnqueueTime: util.DerefTime(p.ScheduledEnqueueTime),
EnqueuedSequenceNumber: util.DerefInt64(p.EnqueuedSequenceNumber),
ViaPartitionKey: util.DerefString(p.ViaPartitionKey),
Annotations: util.MapInterfaceToString(p.Annotations),
//ViaPartitionKey: util.DerefString(p.ViaPartitionKey),
//Annotations: util.MapInterfaceToString(p.Annotations),
}
}

// readQueue reads messages from an ASB queue
func (a *AzureServiceBus) readQueue(ctx context.Context, handler servicebus.HandlerFunc, readOpts *opts.ReadOptions) error {
queue, err := a.client.NewQueue(readOpts.AzureServiceBus.Args.Queue)
func (a *AzureServiceBus) readQueue(ctx context.Context, handler messageHandlerFunc, readOpts *opts.ReadOptions) error {
receiver, err := a.client.NewReceiverForQueue(readOpts.AzureServiceBus.Args.Queue, nil)
if err != nil {
return errors.Wrap(err, "unable to create new azure service bus queue client")
}

defer queue.Close(ctx)
defer func() {
_ = receiver.Close(ctx)
}()

for {
if err := queue.ReceiveOne(ctx, handler); err != nil {
if err := a.handleMessages(ctx, receiver, handler); err != nil {
return err
}

if !readOpts.Continuous {
return nil
}
Expand All @@ -114,20 +114,18 @@ func (a *AzureServiceBus) readQueue(ctx context.Context, handler servicebus.Hand
}

// readTopic reads messages from an ASB topic using the given subscription name
func (a *AzureServiceBus) readTopic(ctx context.Context, handler servicebus.HandlerFunc, readOpts *opts.ReadOptions) error {
topic, err := a.client.NewTopic(readOpts.AzureServiceBus.Args.Topic)
func (a *AzureServiceBus) readTopic(ctx context.Context, handler messageHandlerFunc, readOpts *opts.ReadOptions) error {
receiver, err := a.client.NewReceiverForSubscription(readOpts.AzureServiceBus.Args.Topic, readOpts.AzureServiceBus.Args.SubscriptionName, nil)
if err != nil {
return errors.Wrap(err, "unable to create topic")
}
sub, err := topic.NewSubscription(readOpts.AzureServiceBus.Args.SubscriptionName)
if err != nil {
return errors.Wrap(err, "unable to create topic subscription")
return errors.Wrap(err, "unable to create new azure service bus subscription client")
}

defer sub.Close(ctx)
defer func() {
_ = receiver.Close(ctx)
}()

for {
if err := sub.ReceiveOne(ctx, handler); err != nil {
if err := a.handleMessages(ctx, receiver, handler); err != nil {
return err
}

Expand All @@ -139,6 +137,21 @@ func (a *AzureServiceBus) readTopic(ctx context.Context, handler servicebus.Hand
return nil
}

func (a *AzureServiceBus) handleMessages(ctx context.Context, receiver *azservicebus.Receiver, handler messageHandlerFunc) error {
messages, err := receiver.ReceiveMessages(ctx, 1, nil)
if err != nil {
return err
}

for i := range messages {
if err = handler(ctx, receiver, messages[i]); err != nil {
return err
}
}

return nil
}

func validateReadOptions(readOpts *opts.ReadOptions) error {
if readOpts == nil {
return validate.ErrMissingReadOptions
Expand Down
79 changes: 43 additions & 36 deletions backends/azure-servicebus/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package azure_servicebus
import (
"context"

servicebus "github.com/Azure/azure-service-bus-go"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/pkg/errors"

"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
Expand All @@ -14,26 +14,26 @@ import (
"github.com/batchcorp/plumber/validate"
)

func (a *AzureServiceBus) Relay(ctx context.Context, relayOpts *opts.RelayOptions, relayCh chan interface{}, errorCh chan<- *records.ErrorRecord) error {
func (a *AzureServiceBus) Relay(ctx context.Context, relayOpts *opts.RelayOptions, relayCh chan interface{}, _ chan<- *records.ErrorRecord) error {
if err := validateRelayOpts(relayOpts); err != nil {
return errors.Wrap(err, "invalid relay options")
}

var handler servicebus.HandlerFunc = func(ctx context.Context, msg *servicebus.Message) error {
var handler messageHandlerFunc = func(ctx context.Context, receiver *azservicebus.Receiver, msg *azservicebus.ReceivedMessage) error {
a.log.Debug("Writing message to relay channel")

// This might be nil if no user properties were sent with the original message
if msg.UserProperties == nil {
msg.UserProperties = make(map[string]interface{}, 0)
if msg.ApplicationProperties == nil {
msg.ApplicationProperties = make(map[string]interface{}, 0)
}

// Azure's Message struct does not include this information for some reason
// Seems like it would be good to have. Prefixed with plumber to avoid collisions with user data
if relayOpts.AzureServiceBus.Args.Queue != "" {
msg.UserProperties["plumber_queue"] = relayOpts.AzureServiceBus.Args.Queue
msg.ApplicationProperties["plumber_queue"] = relayOpts.AzureServiceBus.Args.Queue
} else {
msg.UserProperties["plumber_topic"] = relayOpts.AzureServiceBus.Args.Topic
msg.UserProperties["plumber_subscription"] = relayOpts.AzureServiceBus.Args.SubscriptionName
msg.ApplicationProperties["plumber_topic"] = relayOpts.AzureServiceBus.Args.Topic
msg.ApplicationProperties["plumber_subscription"] = relayOpts.AzureServiceBus.Args.SubscriptionName
}

prometheus.Incr("azure-servicebus-relay-consumer", 1)
Expand All @@ -43,8 +43,7 @@ func (a *AzureServiceBus) Relay(ctx context.Context, relayOpts *opts.RelayOption
Options: &types.RelayMessageOptions{},
}

defer msg.Complete(ctx)
return nil
return receiver.CompleteMessage(ctx, msg, nil)
}

if relayOpts.AzureServiceBus.Args.Queue != "" {
Expand All @@ -55,22 +54,18 @@ func (a *AzureServiceBus) Relay(ctx context.Context, relayOpts *opts.RelayOption
}

// relayQueue reads messages from an ASB queue
func (a *AzureServiceBus) relayQueue(ctx context.Context, handler servicebus.HandlerFunc, relayOpts *opts.RelayOptions) error {
queue, err := a.client.NewQueue(relayOpts.AzureServiceBus.Args.Queue)
func (a *AzureServiceBus) relayQueue(ctx context.Context, handler messageHandlerFunc, relayOpts *opts.RelayOptions) error {
receiver, err := a.client.NewReceiverForQueue(relayOpts.AzureServiceBus.Args.Queue, nil)
if err != nil {
return errors.Wrap(err, "unable to create new azure service bus queue client")
}

defer queue.Close(context.Background())
for {
if err := queue.ReceiveOne(ctx, handler); err != nil {
if err == context.Canceled {
a.log.Debug("Received shutdown signal, exiting relayer")
return nil
}

prometheus.IncrPromCounter("plumber_read_errors", 1)
defer func() {
_ = receiver.Close(context.Background())
}()

for {
if err := a.handleRelayMessages(ctx, receiver, handler); err != nil {
return err
}
}
Expand All @@ -79,28 +74,40 @@ func (a *AzureServiceBus) relayQueue(ctx context.Context, handler servicebus.Han
}

// relayTopic reads messages from an ASB topic using the given subscription name
func (a *AzureServiceBus) relayTopic(ctx context.Context, handler servicebus.HandlerFunc, relayOpts *opts.RelayOptions) error {
topic, err := a.client.NewTopic(relayOpts.AzureServiceBus.Args.Topic)
func (a *AzureServiceBus) relayTopic(ctx context.Context, handler messageHandlerFunc, relayOpts *opts.RelayOptions) error {
receiver, err := a.client.NewReceiverForSubscription(relayOpts.AzureServiceBus.Args.Topic, relayOpts.AzureServiceBus.Args.SubscriptionName, nil)
if err != nil {
return errors.Wrap(err, "unable to create new azure service bus topic client")
return errors.Wrap(err, "unable to create new azure service bus subscription client")
}

sub, err := topic.NewSubscription(relayOpts.AzureServiceBus.Args.SubscriptionName)
if err != nil {
return errors.Wrap(err, "unable to create topic subscription")
defer func() {
_ = receiver.Close(context.Background())
}()

for {
if err := a.handleRelayMessages(ctx, receiver, handler); err != nil {
return err
}
}

defer sub.Close(context.Background())
return nil
}

for {
if err := sub.ReceiveOne(ctx, handler); err != nil {
if err == context.Canceled {
a.log.Debug("Received shutdown signal, exiting relayer")
return nil
}
func (a *AzureServiceBus) handleRelayMessages(ctx context.Context, receiver *azservicebus.Receiver, handler messageHandlerFunc) error {
messages, err := receiver.ReceiveMessages(ctx, 1, nil)
if err != nil {
if err == context.Canceled {
a.log.Debug("Received shutdown signal, exiting relayer")
return nil
}

prometheus.IncrPromCounter("plumber_read_errors", 1)

prometheus.IncrPromCounter("plumber_read_errors", 1)
return err
}

for i := range messages {
if err = handler(ctx, receiver, messages[i]); err != nil {
return err
}
}
Expand All @@ -110,7 +117,7 @@ func (a *AzureServiceBus) relayTopic(ctx context.Context, handler servicebus.Han

func validateRelayOpts(relayOpts *opts.RelayOptions) error {
if relayOpts == nil {
return validate.ErrMissingReadOptions
return validate.ErrMissingRelayOptions
}

if relayOpts.AzureServiceBus == nil {
Expand Down
Loading