Skip to content

Commit 15f2efe

Browse files
jpaskhayzhonghui12
authored andcommitted
add timeout config for AWS SDK Go HTTP calls
- also refactored repetitive non-negative integer config parsing based on feedback
1 parent 58916f8 commit 15f2efe

File tree

3 files changed

+37
-14
lines changed

3 files changed

+37
-14
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ If you think you’ve found a potential security issue, please do not post it in
2525
* `aggregation`: Setting `aggregation` to `true` will enable KPL aggregation of records sent to Kinesis. This feature changes the behavior of the `partition_key` feature. See the KPL aggregation section below for more details.
2626
* `compression`: Specify an algorithm for compression of each record. Supported compression algorithms are `zlib` and `gzip`. By default this feature is disabled and records are not compressed.
2727
* `replace_dots`: Replace dot characters in key names with the value of this option. For example, if you add `replace_dots _` in your config then all occurrences of `.` will be replaced with an underscore. By default, dots will not be replaced.
28+
* `http_request_timeout`: Specify a timeout (in seconds) for the underlying AWS SDK Go HTTP call when sending records to Kinesis. By default, a timeout of `0` is used, indicating no timeout. Note that even with no timeout, the default behavior of the AWS SDK Go library may still lead to an eventual timeout.
2829

2930
### Permissions
3031

fluent-bit-kinesis.go

+26-11
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
8989
logrus.Infof("[kinesis %d] plugin parameter compression = '%s'", pluginID, compression)
9090
replaceDots := output.FLBPluginConfigKey(ctx, "replace_dots")
9191
logrus.Infof("[kinesis %d] plugin parameter replace_dots = '%s'", pluginID, replaceDots)
92+
httpRequestTimeout := output.FLBPluginConfigKey(ctx, "http_request_timeout")
93+
logrus.Infof("[kinesis %d] plugin parameter http_request_timeout = '%s'", pluginID, httpRequestTimeout)
9294

9395
if stream == "" || region == "" {
9496
return nil, fmt.Errorf("[kinesis %d] stream and region are required configuration parameters", pluginID)
@@ -119,14 +121,10 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
119121
var concurrencyInt, concurrencyRetriesInt int
120122
var err error
121123
if concurrency != "" {
122-
concurrencyInt, err = strconv.Atoi(concurrency)
124+
concurrencyInt, err = parseNonNegativeConfig("experimental_concurrency", concurrency, pluginID)
123125
if err != nil {
124-
logrus.Errorf("[kinesis %d] Invalid 'experimental_concurrency' value %s specified: %v", pluginID, concurrency, err)
125126
return nil, err
126127
}
127-
if concurrencyInt < 0 {
128-
return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency' value (%s) specified, must be a non-negative number", pluginID, concurrency)
129-
}
130128

131129
if concurrencyInt > maximumConcurrency {
132130
return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency' value (%s) specified, must be less than or equal to %d", pluginID, concurrency, maximumConcurrency)
@@ -138,12 +136,9 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
138136
}
139137

140138
if concurrencyRetries != "" {
141-
concurrencyRetriesInt, err = strconv.Atoi(concurrencyRetries)
139+
concurrencyRetriesInt, err = parseNonNegativeConfig("experimental_concurrency_retries", concurrencyRetries, pluginID)
142140
if err != nil {
143-
return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency_retries' value (%s) specified: %v", pluginID, concurrencyRetries, err)
144-
}
145-
if concurrencyRetriesInt < 0 {
146-
return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency_retries' value (%s) specified, must be a non-negative number", pluginID, concurrencyRetries)
141+
return nil, err
147142
}
148143
} else {
149144
concurrencyRetriesInt = defaultConcurrentRetries
@@ -160,7 +155,27 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
160155
return nil, fmt.Errorf("[kinesis %d] Invalid 'compression' value (%s) specified, must be 'zlib', 'gzip', 'none', or undefined", pluginID, compression)
161156
}
162157

163-
return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeKeyFmt, logKey, replaceDots, concurrencyInt, concurrencyRetriesInt, isAggregate, appendNL, comp, pluginID)
158+
var httpRequestTimeoutDuration time.Duration
159+
if httpRequestTimeout != "" {
160+
httpRequestTimeoutInt, err := parseNonNegativeConfig("http_request_timeout", httpRequestTimeout, pluginID)
161+
if err != nil {
162+
return nil, err
163+
}
164+
httpRequestTimeoutDuration = time.Duration(httpRequestTimeoutInt) * time.Second
165+
}
166+
167+
return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeKeyFmt, logKey, replaceDots, concurrencyInt, concurrencyRetriesInt, isAggregate, appendNL, comp, pluginID, httpRequestTimeoutDuration)
168+
}
169+
170+
func parseNonNegativeConfig(configName string, configValue string, pluginID int) (int, error) {
171+
configValueInt, err := strconv.Atoi(configValue)
172+
if err != nil {
173+
return 0, fmt.Errorf("[kinesis %d] Invalid '%s' value (%s) specified: %v", pluginID, configName, configValue, err)
174+
}
175+
if configValueInt < 0 {
176+
return 0, fmt.Errorf("[kinesis %d] Invalid '%s' value (%s) specified, must be a non-negative number", pluginID, configName, configValue)
177+
}
178+
return configValueInt, nil
164179
}
165180

166181
// The "export" comments have syntactic meaning

kinesis/kinesis.go

+10-3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"errors"
2424
"fmt"
2525
"math"
26+
"net/http"
2627
"os"
2728
"strings"
2829
"sync/atomic"
@@ -114,8 +115,8 @@ type OutputPlugin struct {
114115
}
115116

116117
// NewOutputPlugin creates an OutputPlugin object
117-
func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeFmt, logKey, replaceDots string, concurrency, retryLimit int, isAggregate, appendNewline bool, compression CompressionType, pluginID int) (*OutputPlugin, error) {
118-
client, err := newPutRecordsClient(roleARN, region, kinesisEndpoint, stsEndpoint, pluginID)
118+
func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeFmt, logKey, replaceDots string, concurrency, retryLimit int, isAggregate, appendNewline bool, compression CompressionType, pluginID int, httpRequestTimeout time.Duration) (*OutputPlugin, error) {
119+
client, err := newPutRecordsClient(roleARN, region, kinesisEndpoint, stsEndpoint, pluginID, httpRequestTimeout)
119120
if err != nil {
120121
return nil, err
121122
}
@@ -171,7 +172,7 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEnd
171172
}
172173

173174
// newPutRecordsClient creates the Kinesis client for calling the PutRecords method
174-
func newPutRecordsClient(roleARN string, awsRegion string, kinesisEndpoint string, stsEndpoint string, pluginID int) (*kinesis.Kinesis, error) {
175+
func newPutRecordsClient(roleARN string, awsRegion string, kinesisEndpoint string, stsEndpoint string, pluginID int, httpRequestTimeout time.Duration) (*kinesis.Kinesis, error) {
175176
customResolverFn := func(service, region string, optFns ...func(*endpoints.Options)) (endpoints.ResolvedEndpoint, error) {
176177
if service == endpoints.KinesisServiceID && kinesisEndpoint != "" {
177178
return endpoints.ResolvedEndpoint{
@@ -184,12 +185,16 @@ func newPutRecordsClient(roleARN string, awsRegion string, kinesisEndpoint strin
184185
}
185186
return endpoints.DefaultResolver().EndpointFor(service, region, optFns...)
186187
}
188+
httpClient := &http.Client{
189+
Timeout: httpRequestTimeout,
190+
}
187191

188192
// Fetch base credentials
189193
baseConfig := &aws.Config{
190194
Region: aws.String(awsRegion),
191195
EndpointResolver: endpoints.ResolverFunc(customResolverFn),
192196
CredentialsChainVerboseErrors: aws.Bool(true),
197+
HTTPClient: httpClient,
193198
}
194199

195200
sess, err := session.NewSession(baseConfig)
@@ -206,6 +211,7 @@ func newPutRecordsClient(roleARN string, awsRegion string, kinesisEndpoint strin
206211
creds := stscreds.NewCredentials(svcSess, eksRole)
207212
eksConfig.Credentials = creds
208213
eksConfig.Region = aws.String(awsRegion)
214+
eksConfig.HTTPClient = httpClient
209215
svcConfig = eksConfig
210216

211217
svcSess, err = session.NewSession(svcConfig)
@@ -219,6 +225,7 @@ func newPutRecordsClient(roleARN string, awsRegion string, kinesisEndpoint strin
219225
creds := stscreds.NewCredentials(svcSess, roleARN)
220226
stsConfig.Credentials = creds
221227
stsConfig.Region = aws.String(awsRegion)
228+
stsConfig.HTTPClient = httpClient
222229
svcConfig = stsConfig
223230

224231
svcSess, err = session.NewSession(svcConfig)

0 commit comments

Comments
 (0)