diff --git a/go.mod b/go.mod index 6efd1fea1d..65ea6834eb 100644 --- a/go.mod +++ b/go.mod @@ -130,8 +130,8 @@ require ( github.com/xeipuuv/gojsonschema v1.2.0 github.com/xitongsys/parquet-go v1.6.2 github.com/xitongsys/parquet-go-source v0.0.0-20211228015320-b4f792c43cd0 - github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a - go.mongodb.org/mongo-driver v1.16.1 + github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 + go.mongodb.org/mongo-driver/v2 v2.0.0 go.nanomsg.org/mangos/v3 v3.4.2 go.opentelemetry.io/otel v1.29.0 go.opentelemetry.io/otel/exporters/jaeger v1.17.0 @@ -345,7 +345,6 @@ require ( github.com/moby/sys/sequential v0.5.0 // indirect github.com/moby/sys/user v0.1.0 // indirect github.com/moby/term v0.5.0 // indirect - github.com/montanaflynn/stats v0.7.1 // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/mtibben/percent v0.2.1 // indirect github.com/nats-io/nats-server/v2 v2.9.23 // indirect diff --git a/go.sum b/go.sum index 8441215232..63760c1868 100644 --- a/go.sum +++ b/go.sum @@ -1572,8 +1572,6 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/modocache/gover v0.0.0-20171022184752-b58185e213c5/go.mod h1:caMODM3PzxT8aQXRPkAt8xlV/e7d7w8GM5g0fa5F0D8= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= -github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= -github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs= @@ -1919,8 +1917,8 @@ github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBi github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= -github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a h1:fZHgsYlfvtyqToslyjUt3VOPF4J7aK/3MPcK7xp3PDk= -github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a/go.mod h1:ul22v+Nro/R083muKhosV54bj5niojjWZvU8xrevuH4= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -1941,8 +1939,8 @@ go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= go.etcd.io/bbolt v1.3.10 h1:+BqfJTcCzTItrop8mq/lbzL8wSGtj94UO/3U31shqG0= go.etcd.io/bbolt v1.3.10/go.mod h1:bK3UQLPJZly7IlNmV7uVHJDxfe5aK9Ll93e/74Y9oEQ= go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g= -go.mongodb.org/mongo-driver v1.16.1 h1:rIVLL3q0IHM39dvE+z2ulZLp9ENZKThVfuvN/IiN4l8= -go.mongodb.org/mongo-driver v1.16.1/go.mod h1:oB6AhJQvFQL4LEHyXi6aJzQJtBiTQHiAd83l0GdFaiw= +go.mongodb.org/mongo-driver/v2 v2.0.0 h1:Jfd7XpdZa9yk3eY774bO7SWVb30noLSirL9nKTpavhI= +go.mongodb.org/mongo-driver/v2 v2.0.0/go.mod h1:nSjmNq4JUstE8IRZKTktLgMHM4F1fccL6HGX1yh+8RA= go.nanomsg.org/mangos/v3 v3.4.2 h1:gHlopxjWvJcVCcUilQIsRQk9jdj6/HB7wrTiUN8Ki7Q= go.nanomsg.org/mangos/v3 v3.4.2/go.mod h1:8+hjBMQub6HvXmuGvIq6hf19uxGQIjCofmc62lbedLA= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= @@ -2014,7 +2012,6 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/internal/impl/mongodb/cache.go b/internal/impl/mongodb/cache.go index bac2c1aaca..2d94d8a8c8 100644 --- a/internal/impl/mongodb/cache.go +++ b/internal/impl/mongodb/cache.go @@ -19,9 +19,9 @@ import ( "fmt" "time" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" "github.com/redpanda-data/benthos/v4/public/service" ) @@ -114,7 +114,7 @@ func (m *mongodbCache) Get(ctx context.Context, key string) ([]byte, error) { } func (m *mongodbCache) Set(ctx context.Context, key string, value []byte, _ *time.Duration) error { - opts := options.Update().SetUpsert(true) + opts := options.UpdateOne().SetUpsert(true) filter := bson.M{m.keyField: key} update := bson.M{"$set": bson.M{m.valueField: string(value)}} diff --git a/internal/impl/mongodb/common.go b/internal/impl/mongodb/common.go index 76bf750c06..f83f79fa4c 100644 --- a/internal/impl/mongodb/common.go +++ b/internal/impl/mongodb/common.go @@ -15,16 +15,15 @@ package mongodb import ( - "context" "errors" "fmt" "strconv" "time" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" - "go.mongodb.org/mongo-driver/mongo/writeconcern" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" + "go.mongodb.org/mongo-driver/v2/mongo/writeconcern" "github.com/redpanda-data/benthos/v4/public/bloblang" "github.com/redpanda-data/benthos/v4/public/service" @@ -93,7 +92,7 @@ func getClient(parsedConf *service.ParsedConfig) (client *mongo.Client, database opt := options.Client(). SetConnectTimeout(10 * time.Second). - SetSocketTimeout(30 * time.Second). + SetTimeout(30 * time.Second). SetServerSelectionTimeout(30 * time.Second). ApplyURI(url). SetAppName(appName) @@ -106,10 +105,7 @@ func getClient(parsedConf *service.ParsedConfig) (client *mongo.Client, database opt.SetAuth(creds) } - ctx, done := context.WithTimeout(context.Background(), time.Minute) - defer done() - - if client, err = mongo.Connect(ctx, opt); err != nil { + if client, err = mongo.Connect(opt); err != nil { return } @@ -275,7 +271,7 @@ func writeConcernDocs() *service.ConfigField { ).Description("The write concern settings for the mongo connection.") } -func writeConcernCollectionOptionFromParsed(pConf *service.ParsedConfig) (opt *options.CollectionOptions, err error) { +func writeConcernSpecFromParsed(pConf *service.ParsedConfig) (spec *writeConcernSpec, err error) { pConf = pConf.Namespace(commonFieldWriteConcern) var w string @@ -296,8 +292,7 @@ func writeConcernCollectionOptionFromParsed(pConf *service.ParsedConfig) (opt *o } writeConcern := &writeconcern.WriteConcern{ - Journal: &j, - WTimeout: wTimeout, + Journal: &j, } if wInt, err := strconv.Atoi(w); err != nil { writeConcern.W = w @@ -305,7 +300,15 @@ func writeConcernCollectionOptionFromParsed(pConf *service.ParsedConfig) (opt *o writeConcern.W = wInt } - return options.Collection().SetWriteConcern(writeConcern), nil + return &writeConcernSpec{ + options: options.Collection().SetWriteConcern(writeConcern), + wTimeout: wTimeout, + }, nil +} + +type writeConcernSpec struct { + options *options.CollectionOptionsBuilder + wTimeout time.Duration } //------------------------------------------------------------------------------ diff --git a/internal/impl/mongodb/input.go b/internal/impl/mongodb/input.go index 0dab431702..1f672592b9 100644 --- a/internal/impl/mongodb/input.go +++ b/internal/impl/mongodb/input.go @@ -19,9 +19,9 @@ import ( "errors" "fmt" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" "github.com/redpanda-data/benthos/v4/public/service" ) @@ -180,15 +180,13 @@ func (m *mongoInput) Connect(ctx context.Context) error { collection := m.database.Collection(m.collection) switch m.operation { case "find": - var findOptions *options.FindOptions - findOptions, err = m.getFindOptions() + findOptions, err := m.getFindOptions() if err != nil { return fmt.Errorf("error parsing 'find' options: %v", err) } m.cursor, err = collection.Find(ctx, m.query, findOptions) case "aggregate": - var aggregateOptions *options.AggregateOptions - aggregateOptions, err = m.getAggregateOptions() + aggregateOptions, err := m.getAggregateOptions() if err != nil { return fmt.Errorf("error parsing 'aggregate' options: %v", err) } @@ -245,7 +243,7 @@ func (m *mongoInput) Close(ctx context.Context) error { return nil } -func (m *mongoInput) getFindOptions() (*options.FindOptions, error) { +func (m *mongoInput) getFindOptions() (*options.FindOptionsBuilder, error) { findOptions := options.Find() if m.batchSize > 0 { findOptions.SetBatchSize(m.batchSize) @@ -259,7 +257,7 @@ func (m *mongoInput) getFindOptions() (*options.FindOptions, error) { return findOptions, nil } -func (m *mongoInput) getAggregateOptions() (*options.AggregateOptions, error) { +func (m *mongoInput) getAggregateOptions() (*options.AggregateOptionsBuilder, error) { aggregateOptions := options.Aggregate() if m.batchSize > 0 { aggregateOptions.SetBatchSize(m.batchSize) diff --git a/internal/impl/mongodb/input_test.go b/internal/impl/mongodb/input_test.go index 37c9e7095f..eca6e74df4 100644 --- a/internal/impl/mongodb/input_test.go +++ b/internal/impl/mongodb/input_test.go @@ -22,9 +22,9 @@ import ( "github.com/ory/dockertest/v3" "github.com/stretchr/testify/assert" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" "github.com/stretchr/testify/require" @@ -85,15 +85,15 @@ func TestInputIntegration(t *testing.T) { dbName := "TestDB" collName := "TestCollection" require.NoError(t, pool.Retry(func() error { - if mongoClient, err = mongo.Connect(context.Background(), options.Client(). - SetConnectTimeout(10*time.Second). - SetSocketTimeout(30*time.Second). - SetServerSelectionTimeout(30*time.Second). + if mongoClient, err = mongo.Connect(options.Client(). + SetConnectTimeout(10 * time.Second). + SetTimeout(30 * time.Second). + SetServerSelectionTimeout(30 * time.Second). SetAuth(options.Credential{ Username: "mongoadmin", Password: "secret", }). - ApplyURI("mongodb://localhost:"+resource.GetPort("27017/tcp"))); err != nil { + ApplyURI("mongodb://localhost:" + resource.GetPort("27017/tcp"))); err != nil { return err } if err := mongoClient.Database(dbName).CreateCollection(context.Background(), collName); err != nil { @@ -151,12 +151,9 @@ func TestInputIntegration(t *testing.T) { "age": bson.M{ "$gte": 18, }, - }, &options.FindOptions{ - Sort: bson.M{ - "name": 1, - }, - Limit: &limit, - }) + }, options.Find(). + SetSort(bson.M{"name": 1}). + SetLimit(limit)) }, placeholderConf: ` url: "mongodb://localhost:%s" @@ -244,7 +241,7 @@ func testInput( t.Helper() controlCtx := context.Background() - controlConn, err := mongo.Connect(controlCtx, options.Client().ApplyURI("mongodb://mongoadmin:secret@localhost:"+port)) + controlConn, err := mongo.Connect(options.Client().ApplyURI("mongodb://mongoadmin:secret@localhost:" + port)) require.NoError(t, err) controlColl := controlConn.Database("TestDB").Collection("TestCollection") controlCur, err := controlQuery(controlColl) diff --git a/internal/impl/mongodb/integration_test.go b/internal/impl/mongodb/integration_test.go index 361150ffbc..129ce1b6aa 100644 --- a/internal/impl/mongodb/integration_test.go +++ b/internal/impl/mongodb/integration_test.go @@ -25,9 +25,9 @@ import ( "github.com/ory/dockertest/v3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" "github.com/redpanda-data/benthos/v4/public/service/integration" ) @@ -64,15 +64,15 @@ func TestIntegrationMongoDB(t *testing.T) { _ = resource.Expire(900) require.NoError(t, pool.Retry(func() error { - mongoClient, err = mongo.Connect(context.Background(), options.Client(). - SetConnectTimeout(10*time.Second). - SetSocketTimeout(30*time.Second). - SetServerSelectionTimeout(30*time.Second). + mongoClient, err = mongo.Connect(options.Client(). + SetConnectTimeout(10 * time.Second). + SetTimeout(30 * time.Second). + SetServerSelectionTimeout(30 * time.Second). SetAuth(options.Credential{ Username: "mongoadmin", Password: "secret", }). - ApplyURI("mongodb://localhost:"+resource.GetPort("27017/tcp"))) + ApplyURI("mongodb://localhost:" + resource.GetPort("27017/tcp"))) return err })) diff --git a/internal/impl/mongodb/output.go b/internal/impl/mongodb/output.go index 2bc90eaf26..7c9875a3e0 100644 --- a/internal/impl/mongodb/output.go +++ b/internal/impl/mongodb/output.go @@ -20,8 +20,7 @@ import ( "fmt" "sync" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/v2/mongo" "github.com/redpanda-data/benthos/v4/public/service" @@ -83,12 +82,12 @@ func init() { type outputWriter struct { log *service.Logger - client *mongo.Client - database *mongo.Database - collection *service.InterpolatedString - writeConcernCollectionOption *options.CollectionOptions - operation Operation - writeMaps writeMaps + client *mongo.Client + database *mongo.Database + collection *service.InterpolatedString + writeConcernSpec *writeConcernSpec + operation Operation + writeMaps writeMaps mu sync.Mutex } @@ -103,7 +102,7 @@ func newOutputWriter(conf *service.ParsedConfig, res *service.Resources) (db *ou if db.collection, err = conf.FieldInterpolatedString(moFieldCollection); err != nil { return } - if db.writeConcernCollectionOption, err = writeConcernCollectionOptionFromParsed(conf); err != nil { + if db.writeConcernSpec, err = writeConcernSpecFromParsed(conf); err != nil { return } if db.operation, err = operationFromParsed(conf); err != nil { @@ -201,9 +200,7 @@ func (m *outputWriter) WriteBatch(ctx context.Context, batch service.MessageBatc // Dispatch any documents which WalkWithBatchedErrors managed to process successfully if len(writeModelsMap) > 0 { for collectionStr, writeModels := range writeModelsMap { - // We should have at least one write model in the slice - collection := m.database.Collection(collectionStr, m.writeConcernCollectionOption) - if _, err := collection.BulkWrite(ctx, writeModels); err != nil { + if err := m.builkWrite(ctx, collectionStr, writeModels); err != nil { return err } } @@ -216,6 +213,16 @@ func (m *outputWriter) WriteBatch(ctx context.Context, batch service.MessageBatc return nil } +func (m *outputWriter) builkWrite(ctx context.Context, collectionStr string, writeModels []mongo.WriteModel) error { + ctx, cancel := context.WithTimeout(ctx, m.writeConcernSpec.wTimeout) + defer cancel() + + // We should have at least one write model in the slice + collection := m.database.Collection(collectionStr, m.writeConcernSpec.options) + _, err := collection.BulkWrite(ctx, writeModels) + return err +} + func (m *outputWriter) Close(ctx context.Context) error { m.mu.Lock() defer m.mu.Unlock() diff --git a/internal/impl/mongodb/processor.go b/internal/impl/mongodb/processor.go index 08b3507ed7..4dffc86431 100644 --- a/internal/impl/mongodb/processor.go +++ b/internal/impl/mongodb/processor.go @@ -19,9 +19,9 @@ import ( "errors" "fmt" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" "github.com/redpanda-data/benthos/v4/public/service" @@ -81,12 +81,12 @@ func init() { type Processor struct { log *service.Logger - client *mongo.Client - database *mongo.Database - collection *service.InterpolatedString - writeConcernCollectionOption *options.CollectionOptions - operation Operation - writeMaps writeMaps + client *mongo.Client + database *mongo.Database + collection *service.InterpolatedString + writeConcernSpec *writeConcernSpec + operation Operation + writeMaps writeMaps marshalMode JSONMarshalMode } @@ -102,7 +102,7 @@ func ProcessorFromParsed(conf *service.ParsedConfig, res *service.Resources) (mp if mp.collection, err = conf.FieldInterpolatedString(mpFieldCollection); err != nil { return } - if mp.writeConcernCollectionOption, err = writeConcernCollectionOptionFromParsed(conf); err != nil { + if mp.writeConcernSpec, err = writeConcernSpecFromParsed(conf); err != nil { return } if mp.operation, err = operationFromParsed(conf); err != nil { @@ -147,9 +147,9 @@ func (m *Processor) ProcessBatch(ctx context.Context, batch service.MessageBatch return err } - findOptions := &options.FindOneOptions{} + findOptions := options.FindOne() if hintJSON != nil { - findOptions.Hint = hintJSON + findOptions.SetHint(hintJSON) } collectionStr, err := batch.TryInterpolatedString(i, m.collection) @@ -188,10 +188,13 @@ func (m *Processor) ProcessBatch(ctx context.Context, batch service.MessageBatch Hint: hintJSON, } case OperationFindOne: - collection := m.database.Collection(collectionStr, m.writeConcernCollectionOption) + ctx, cancel := context.WithTimeout(context.Background(), m.writeConcernSpec.wTimeout) + defer cancel() + + collection := m.database.Collection(collectionStr, m.writeConcernSpec.options) var decoded any - if err = collection.FindOne(context.Background(), filterJSON, findOptions).Decode(&decoded); err != nil { + if err = collection.FindOne(ctx, filterJSON, findOptions).Decode(&decoded); err != nil { if errors.Is(err, mongo.ErrNoDocuments) { return err } @@ -219,21 +222,28 @@ func (m *Processor) ProcessBatch(ctx context.Context, batch service.MessageBatch if len(writeModelsMap) > 0 { for collectionStr, msAndMs := range writeModelsMap { - collection := m.database.Collection(collectionStr, m.writeConcernCollectionOption) - - // We should have at least one write model in the slice - if _, err := collection.BulkWrite(ctx, msAndMs.ws); err != nil { - m.log.Errorf("Bulk write failed in mongodb processor: %v", err) - for _, msg := range msAndMs.msgs { - msg.SetError(err) - } - } + m.bulkWrite(ctx, collectionStr, &msAndMs) } } return []service.MessageBatch{batch}, nil } +func (m *Processor) bulkWrite(ctx context.Context, collectionStr string, msgsAndModels *msgsAndModels) { + ctx, cancel := context.WithTimeout(ctx, m.writeConcernSpec.wTimeout) + defer cancel() + + collection := m.database.Collection(collectionStr, m.writeConcernSpec.options) + + // We should have at least one write model in the slice + if _, err := collection.BulkWrite(ctx, msgsAndModels.ws); err != nil { + m.log.Errorf("Bulk write failed in mongodb processor: %v", err) + for _, msg := range msgsAndModels.msgs { + msg.SetError(err) + } + } +} + // Close the connection to mongodb. func (m *Processor) Close(ctx context.Context) error { return m.client.Disconnect(ctx) diff --git a/internal/impl/mongodb/processor_test.go b/internal/impl/mongodb/processor_test.go index f586db8761..8e3710c6dc 100644 --- a/internal/impl/mongodb/processor_test.go +++ b/internal/impl/mongodb/processor_test.go @@ -24,9 +24,9 @@ import ( "github.com/ory/dockertest/v3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" "github.com/redpanda-data/benthos/v4/public/service" "github.com/redpanda-data/benthos/v4/public/service/integration" @@ -60,15 +60,15 @@ func TestProcessorIntegration(t *testing.T) { var mongoClient *mongo.Client require.NoError(t, pool.Retry(func() error { - mongoClient, err = mongo.Connect(context.Background(), options.Client(). - SetConnectTimeout(10*time.Second). - SetSocketTimeout(30*time.Second). - SetServerSelectionTimeout(30*time.Second). + mongoClient, err = mongo.Connect(options.Client(). + SetConnectTimeout(10 * time.Second). + SetTimeout(30 * time.Second). + SetServerSelectionTimeout(30 * time.Second). SetAuth(options.Credential{ Username: "mongoadmin", Password: "secret", }). - ApplyURI("mongodb://localhost:"+resource.GetPort("27017/tcp"))) + ApplyURI("mongodb://localhost:" + resource.GetPort("27017/tcp"))) if err != nil { return err }