Skip to content

Commit

Permalink
Fix tests, update changelog and small refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Mihai Todor <todormihai@gmail.com>
  • Loading branch information
mihaitodor committed Dec 26, 2024
1 parent 7fc4a28 commit 9fd2d02
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 21 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ All notable changes to this project will be documented in this file.

### Added

- `avro` scanner now emits metadata for the Avro schema it used along with the schema fingerprint (@rockwotj)
- `avro` scanner now emits metadata for the Avro schema it used along with the schema fingerprint. (@rockwotj)
- Field `content_type` added to the `amqp_1` output. (@timo102)

### Fixed

Expand Down
4 changes: 1 addition & 3 deletions docs/modules/components/pages/outputs/amqp_1.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -388,9 +388,7 @@ Provide a list of explicit metadata key prefixes to be excluded when adding meta
=== `content_type`
Define the message body content type.
The option `string` will transfer the message as an AMQP value of type string. Consider choosing the option `string` if your intention is to transfer UTF-8 string messages (like JSON messages) to the destination.
Specify the message body content type. The option `string` will transfer the message as an AMQP value of type string. Consider choosing the option `string` if your intention is to transfer UTF-8 string messages (like JSON messages) to the destination.
*Type*: `string`
Expand Down
8 changes: 0 additions & 8 deletions internal/impl/amqp1/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,6 @@ const (
contentTypeField = "content_type"
)

// Content Type Options
const (
// Data section with opaque binary data
amqpContentTypeOpaqueBinary = "opaque_binary"
// Single AMQP string value
amqpContentTypeString = "string"
)

// ErrSASLMechanismNotSupported is returned if a SASL mechanism was not recognised.
type ErrSASLMechanismNotSupported string

Expand Down
8 changes: 7 additions & 1 deletion internal/impl/amqp1/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ func TestIntegrationAMQP1(t *testing.T) {
require.NoError(t, err)

pool.MaxWait = time.Second * 30
resource, err := pool.Run("rmohr/activemq", "latest", nil)
resource, err := pool.Run("apache/activemq-classic",
"latest",
[]string{
"ACTIVEMQ_CONNECTION_USER=guest",
"ACTIVEMQ_CONNECTION_PASSWORD=guest",
},
)
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, pool.Purge(resource))
Expand Down
27 changes: 19 additions & 8 deletions internal/impl/amqp1/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ import (
"github.com/redpanda-data/benthos/v4/public/service"
)

type amqpContentType string

const (
// Data section with opaque binary data
amqpContentTypeOpaqueBinary amqpContentType = "opaque_binary"
// Single AMQP string value
amqpContentTypeString amqpContentType = "string"
)

func amqp1OutputSpec() *service.ConfigSpec {
return service.NewConfigSpec().
Stable().
Expand Down Expand Up @@ -69,10 +78,10 @@ This output benefits from sending multiple messages in flight in parallel for im
service.NewMetadataExcludeFilterField(metaFilterField).
Description("Specify criteria for which metadata values are attached to messages as headers."),
service.NewStringEnumField(contentTypeField,
amqpContentTypeOpaqueBinary, amqpContentTypeString).
Description("Define the message body content type.\n\nThe option `string` will transfer the message as an AMQP value of type string. Consider choosing the option `string` if your intention is to transfer UTF-8 string messages (like JSON messages) to the destination.").
string(amqpContentTypeOpaqueBinary), string(amqpContentTypeString)).
Description("Specify the message body content type. The option `string` will transfer the message as an AMQP value of type string. Consider choosing the option `string` if your intention is to transfer UTF-8 string messages (like JSON messages) to the destination.").
Advanced().
Default(amqpContentTypeOpaqueBinary),
Default(string(amqpContentTypeOpaqueBinary)),
).LintRule(`
root = if this.url.or("") == "" && this.urls.or([]).length() == 0 {
"field 'urls' must be set"
Expand Down Expand Up @@ -110,7 +119,7 @@ type amqp1Writer struct {
metaFilter *service.MetadataExcludeFilter
applicationPropertiesMap *bloblang.Executor
connOpts *amqp.ConnOptions
contentType string
contentType amqpContentType

log *service.Logger
connLock sync.RWMutex
Expand Down Expand Up @@ -171,8 +180,10 @@ func amqp1WriterFromParsed(conf *service.ParsedConfig, mgr *service.Resources) (
return nil, err
}

if a.contentType, err = conf.FieldString(contentTypeField); err != nil {
if contentType, err := conf.FieldString(contentTypeField); err != nil {
return nil, err
} else {
a.contentType = amqpContentType(contentType)
}

return &a, nil
Expand Down Expand Up @@ -259,12 +270,12 @@ func (a *amqp1Writer) Write(ctx context.Context, msg *service.Message) error {
return err
}

m := &amqp.Message{}

var m *amqp.Message
switch a.contentType {
case amqpContentTypeOpaqueBinary:
m.Data = [][]byte{mBytes}
m = amqp.NewMessage(mBytes)
case amqpContentTypeString:
m = &amqp.Message{}
m.Value = string(mBytes)
default:
return fmt.Errorf("invalid content type specified: %s", a.contentType)
Expand Down

0 comments on commit 9fd2d02

Please sign in to comment.