Skip to content

Commit

Permalink
in_kafka: make pull timeout configurable
Browse files Browse the repository at this point in the history
Signed-off-by: CoreidCC <sws-github@coreid.cc>
  • Loading branch information
coreidcc committed Jan 6, 2025
1 parent 134285e commit 05bb47f
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 8 deletions.
24 changes: 16 additions & 8 deletions plugins/in_kafka/in_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,9 @@ static int in_kafka_collect(struct flb_input_instance *ins,


if(!ctx->enable_auto_commit) {
/* TO-DO: commit the record based on `ret` */
rd_kafka_commit(ctx->kafka.rk, NULL, 0);
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
rd_kafka_commit(ctx->kafka.rk, NULL, 0);
}
}

/* Break from the loop when reaching the limit of polling if available */
Expand Down Expand Up @@ -225,6 +226,7 @@ static int in_kafka_init(struct flb_input_instance *ins,
char errstr[512];
(void) data;
char conf_val[16];
size_t dsize;

/* Allocate space for the configuration context */
ctx = flb_malloc(sizeof(struct flb_in_kafka_config));
Expand Down Expand Up @@ -252,13 +254,20 @@ static int in_kafka_init(struct flb_input_instance *ins,
* -> minimize the delay we might create
* b) run in our own thread:
* -> optimize for throuput and relay on 'fetch.wait.max.ms'
* which is set to 500 by default default. lets set it to
* twice that so that increasing fetch.wait.max.ms still
* has an effect.
* which is set to 500 by default default. wa algin our
* timeout with what is set for 'fetch.wait.max.ms'
*/
ctx->poll_timeount_ms = 1;
if(ins->is_threaded) {
ctx->poll_timeount_ms = 1000;
if (ins->is_threaded) {
ctx->poll_timeount_ms = 550; // ensure kafa triggers timeout

// align our timeout with what was configured for fetch.wait.max.ms
dsize = sizeof(conf_val);
res = rd_kafka_conf_get(kafka_conf, "fetch.wait.max.ms", conf_val, &dsize);
if (res == RD_KAFKA_CONF_OK && dsize <= sizeof(conf_val)) {
// add 50ms so kafa triggers timout
ctx->poll_timeount_ms = atoi(conf_val) + 50;
}
}

if (ctx->buffer_max_size > 0) {
Expand Down Expand Up @@ -451,7 +460,6 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, enable_auto_commit),
"Rely on kafka auto-commit and commit messages in batches"
},
/* EOF */
{0}
};

Expand Down
1 change: 1 addition & 0 deletions plugins/in_kafka/in_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#define FLB_IN_KAFKA_UNLIMITED (size_t)-1
#define FLB_IN_KAFKA_BUFFER_MAX_SIZE "4M"
#define FLB_IN_KAFKA_ENABLE_AUTO_COMMIT "false"
#define FLB_IN_KAFKA_POLL_TIMEOUT_MS "550" // same as kafka fetch.wait.max.ms + 10%

enum {
FLB_IN_KAFKA_FORMAT_NONE,
Expand Down

0 comments on commit 05bb47f

Please sign in to comment.