Skip to content

Commit

Permalink
Merge pull request #3098 from redpanda-data/mihaitodor-amqp-1-content…
Browse files Browse the repository at this point in the history
…-type-support

AMQP 1.0 Output - Support for message body type
  • Loading branch information
mihaitodor authored Dec 26, 2024
2 parents 0a00e27 + 9fd2d02 commit 21a9d97
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 3 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ All notable changes to this project will be documented in this file.

### Added

- `avro` scanner now emits metadata for the Avro schema it used along with the schema fingerprint (@rockwotj)
- `avro` scanner now emits metadata for the Avro schema it used along with the schema fingerprint. (@rockwotj)
- Field `content_type` added to the `amqp_1` output. (@timo102)

### Fixed

Expand Down
15 changes: 15 additions & 0 deletions docs/modules/components/pages/outputs/amqp_1.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ output:
password: ""
metadata:
exclude_prefixes: []
content_type: opaque_binary
```
--
Expand Down Expand Up @@ -385,4 +386,18 @@ Provide a list of explicit metadata key prefixes to be excluded when adding meta
*Default*: `[]`
=== `content_type`
Specify the message body content type. The option `string` will transfer the message as an AMQP value of type string. Consider choosing the option `string` if your intention is to transfer UTF-8 string messages (like JSON messages) to the destination.
*Type*: `string`
*Default*: `"opaque_binary"`
Options:
`opaque_binary`
, `string`
.
1 change: 1 addition & 0 deletions internal/impl/amqp1/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
targetAddrField = "target_address"
appPropsMapField = "application_properties_map"
metaFilterField = "metadata"
contentTypeField = "content_type"
)

// ErrSASLMechanismNotSupported is returned if a SASL mechanism was not recognised.
Expand Down
27 changes: 26 additions & 1 deletion internal/impl/amqp1/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ func TestIntegrationAMQP1(t *testing.T) {
require.NoError(t, err)

pool.MaxWait = time.Second * 30
resource, err := pool.Run("rmohr/activemq", "latest", nil)
resource, err := pool.Run("apache/activemq-classic",
"latest",
[]string{
"ACTIVEMQ_CONNECTION_USER=guest",
"ACTIVEMQ_CONNECTION_PASSWORD=guest",
},
)
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, pool.Purge(resource))
Expand Down Expand Up @@ -90,6 +96,21 @@ input:
source_address: "queue:/$ID"
`

templateWithContentTypeString := `
output:
amqp_1:
url: amqp://guest:guest@localhost:$PORT/
target_address: "queue:/$ID"
max_in_flight: $MAX_IN_FLIGHT
content_type: "string"
metadata:
exclude_prefixes: [ $OUTPUT_META_EXCLUDE_PREFIX ]
input:
amqp_1:
url: amqp://guest:guest@localhost:$PORT/
source_address: "queue:/$ID"
`

testcases := []struct {
label string
template string
Expand All @@ -102,6 +123,10 @@ input:
label: "should handle new field urls",
template: templateWithFieldURLS,
},
{
label: "should handle content type string",
template: templateWithContentTypeString,
},
}

for _, tc := range testcases {
Expand Down
33 changes: 32 additions & 1 deletion internal/impl/amqp1/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ import (
"github.com/redpanda-data/benthos/v4/public/service"
)

type amqpContentType string

const (
// Data section with opaque binary data
amqpContentTypeOpaqueBinary amqpContentType = "opaque_binary"
// Single AMQP string value
amqpContentTypeString amqpContentType = "string"
)

func amqp1OutputSpec() *service.ConfigSpec {
return service.NewConfigSpec().
Stable().
Expand Down Expand Up @@ -68,6 +77,11 @@ This output benefits from sending multiple messages in flight in parallel for im
saslFieldSpec(),
service.NewMetadataExcludeFilterField(metaFilterField).
Description("Specify criteria for which metadata values are attached to messages as headers."),
service.NewStringEnumField(contentTypeField,
string(amqpContentTypeOpaqueBinary), string(amqpContentTypeString)).
Description("Specify the message body content type. The option `string` will transfer the message as an AMQP value of type string. Consider choosing the option `string` if your intention is to transfer UTF-8 string messages (like JSON messages) to the destination.").
Advanced().
Default(string(amqpContentTypeOpaqueBinary)),
).LintRule(`
root = if this.url.or("") == "" && this.urls.or([]).length() == 0 {
"field 'urls' must be set"
Expand Down Expand Up @@ -105,6 +119,7 @@ type amqp1Writer struct {
metaFilter *service.MetadataExcludeFilter
applicationPropertiesMap *bloblang.Executor
connOpts *amqp.ConnOptions
contentType amqpContentType

log *service.Logger
connLock sync.RWMutex
Expand Down Expand Up @@ -164,6 +179,13 @@ func amqp1WriterFromParsed(conf *service.ParsedConfig, mgr *service.Resources) (
if a.metaFilter, err = conf.FieldMetadataExcludeFilter(metaFilterField); err != nil {
return nil, err
}

if contentType, err := conf.FieldString(contentTypeField); err != nil {
return nil, err
} else {
a.contentType = amqpContentType(contentType)
}

return &a, nil
}

Expand Down Expand Up @@ -248,7 +270,16 @@ func (a *amqp1Writer) Write(ctx context.Context, msg *service.Message) error {
return err
}

m := amqp.NewMessage(mBytes)
var m *amqp.Message
switch a.contentType {
case amqpContentTypeOpaqueBinary:
m = amqp.NewMessage(mBytes)
case amqpContentTypeString:
m = &amqp.Message{}
m.Value = string(mBytes)
default:
return fmt.Errorf("invalid content type specified: %s", a.contentType)
}

if a.applicationPropertiesMap != nil {
mapMsg, err := msg.BloblangQuery(a.applicationPropertiesMap)
Expand Down

0 comments on commit 21a9d97

Please sign in to comment.