-
Notifications
You must be signed in to change notification settings - Fork 1.8k
out_s3: Add parquet compression type with pure C #10691
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
out_s3: Add parquet compression type with pure C #10691
Conversation
WalkthroughAdds Parquet compression support gated by Arrow GLib Parquet detection, extends S3 output to treat any non-NONE compression uniformly, enforces put-object for Arrow/Parquet, updates build scripts and CI to install Arrow/Parquet GLib, and exposes a new compressor API plus option wiring. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant User as User/Config
participant S3 as S3 Plugin
participant Comp as Compressor (gzip/arrow/parquet)
participant AWS as Amazon S3
User->>S3: Configure compression=(none|gzip|arrow|parquet), use_put_object
alt compression in {arrow, parquet}
S3->>S3: Verify use_put_object == true
alt use_put_object false
S3-->>User: Error: require put-object for Arrow/Parquet
note right of S3: Abort send
else use_put_object true
S3->>Comp: Compress records
alt parquet
note over Comp: out_s3_compress_parquet\n(Arrow Table -> Parquet buffer)
else arrow
note over Comp: out_s3_compress_arrow\n(Arrow Table -> Feather)
else gzip
note over Comp: gzip buffer
end
Comp-->>S3: Compressed payload, size
S3->>S3: Validate size (multipart 5GB compressed limit)
S3->>AWS: Upload (PUT Object or Multipart)
AWS-->>S3: Response
S3-->>User: Result
end
else compression == none
S3->>S3: No compression
S3->>S3: Validate size (50MB uncompressed limit for multipart)
S3->>AWS: Upload (PUT Object or Multipart)
AWS-->>S3: Response
S3-->>User: Result
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~30 minutes Poem
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
85316ab
to
0afb495
Compare
243f704
to
4eaed0d
Compare
4eaed0d
to
e69bbd2
Compare
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
d165b26
to
fbaab06
Compare
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
plugins/out_s3/s3.c (1)
706-714
: Critical: validation uses ctx->compression before it’s assigned (guard never triggers).You compute
ret = flb_aws_compression_get_type(tmp)
but then checkctx->compression
(still holding its old/default value) instead ofret
. This allows Arrow/Parquet withuse_put_object == FLB_FALSE
to slip past initialization. Compare againstret
(or assign first, then validate).Apply this minimal fix:
ret = flb_aws_compression_get_type(tmp); if (ret == -1) { flb_plg_error(ctx->ins, "unknown compression: %s", tmp); return -1; } - if (ctx->use_put_object == FLB_FALSE && - (ctx->compression == FLB_AWS_COMPRESS_ARROW || - ctx->compression == FLB_AWS_COMPRESS_PARQUET)) { + if (ctx->use_put_object == FLB_FALSE && + (ret == FLB_AWS_COMPRESS_ARROW || + ret == FLB_AWS_COMPRESS_PARQUET)) { flb_plg_error(ctx->ins, - "use_put_object must be enabled when Apache Arrow or Parquet is enabled"); + "use_put_object must be enabled when Apache Arrow or Parquet compression is selected"); return -1; } ctx->compression = ret;
♻️ Duplicate comments (1)
plugins/out_s3/s3.c (1)
1173-1177
: Fix misleading log message (it’s reporting chunk size, not upload_chunk_size).The message says “Pre-compression upload_chunk_size” but prints the pre-compression chunk size. Suggest clarifying wording.
This also reflects the earlier feedback about the Arrow/Parquet fallback branch; it’s now correctly limited to the gzip-only case.
Apply this tweak:
- flb_plg_info(ctx->ins, "Pre-compression upload_chunk_size= %zu, After compression, chunk is only %zu bytes, " - "the chunk was too small, using PutObject to upload", preCompress_size, body_size); + flb_plg_info(ctx->ins, + "Pre-compression chunk size was %zu bytes; after compression it's %zu bytes; " + "chunk too small, using PutObject to upload", + preCompress_size, body_size);
🧹 Nitpick comments (2)
plugins/out_s3/s3.c (2)
1195-1197
: De-duplicate compressed-buffer frees with a single cleanup path.You correctly free
payload_buf
across all exit paths, but the repetition makes future edits error-prone. Consider a singlecleanup:
block (or a small inline helper) to free the compressed buffer when set.Example pattern:
int ret = FLB_OK; void *payload_buf = NULL; bool compressed = false; /* ... set compressed=true and payload_buf on success ... */ put_object: ret = s3_put_object(...); cleanup: if (compressed && payload_buf != NULL) { flb_free(payload_buf); } return ret;This keeps ownership clear and reduces maintenance overhead.
Also applies to: 1222-1224, 1236-1238, 1246-1248, 1263-1265
3996-3999
: Clarify option docs: Arrow/Parquet require PutObject.Since initialization enforces PutObject for Arrow/Parquet, reflect that in the
compression
option description to prevent config surprises.- "Compression type for S3 objects. 'gzip', 'arrow' and 'parquet' are the supported values. " - "'arrow' and 'parquet' are only available if Apache Arrow was enabled at compile time. " + "Compression type for S3 objects. Supported values: 'gzip', 'arrow', 'parquet'. " + "'arrow' and 'parquet' are available only if Apache Arrow was enabled at compile time, " + "and require 'use_put_object' to be enabled. " "Defaults to no compression. " "If 'gzip' is selected, the Content-Encoding HTTP Header will be set to 'gzip'."
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
CMakeLists.txt
(1 hunks)plugins/out_s3/s3.c
(9 hunks)src/aws/compression/arrow/CMakeLists.txt
(1 hunks)src/aws/compression/arrow/compress.c
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- src/aws/compression/arrow/CMakeLists.txt
- src/aws/compression/arrow/compress.c
- CMakeLists.txt
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (29)
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit (Arm64), amd64_arm64, -DCMAKE_SYSTEM_NAME=Windows -DCMA...
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 32bit, x86, x86-windows-static, 3.31.6)
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit, x64, x64-windows-static, 3.31.6)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, gcc, g++)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-22.04, clang-12)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_ARROW=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_COVERAGE=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, clang, clang++)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-22.04, clang-12)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, gcc, g++)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-24.04, clang-14)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-24.04, clang-14)
- GitHub Check: pr-compile-centos-7
- GitHub Check: PR - fuzzing test
🔇 Additional comments (3)
plugins/out_s3/s3.c (3)
735-745
: Multipart size caps: compressed vs. uncompressed limit — LGTM.Good call enforcing 5 GB per-part cap for compressed payloads and retaining 50 MB for uncompressed. This aligns the code with S3 multipart constraints and avoids accidental oversizing.
1130-1141
: Unified compression preprocessing for all non-NONE types — LGTM.Centralizing gzip/arrow/parquet into a single compression pathway simplifies control flow and makes memory handling consistent. The early return on compression failure with FLB_RETRY is appropriate.
1352-1364
: Graceful fallback to uncompressed on compression failure — LGTM.When compression fails in put_all_chunks(), you log and proceed with the original buffer to prevent data loss. Memory ownership transitions (
buffer
->payload_buf
) and frees look correct.
With apache arrow glib parquet library, we're able to support parquet format on out_s3.
Enter
[N/A]
in the box, if an item is not applicable to your change.Testing
Before we can approve your change; please submit the following in a comment:
With leaks on macOS, there's no leaks:
With valgrind:
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
ok-package-test
label to test for all targets (requires maintainer to do).Documentation
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
New Features
Improvements
Documentation