Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

processor: add conditional processing support for logs #10142

Merged
merged 16 commits into from
Mar 28, 2025

Conversation

niedbalski
Copy link
Collaborator

@niedbalski niedbalski commented Mar 28, 2025

Processor conditional processing for logs

Fix #10127

This PR adds conditional processing support for logs in processors. This feature allows processors to conditionally process logs based on field values.

Example configurations

  1. Simple
service:
  log_level: info
pipeline:
  inputs:
    - name: dummy
      dummy: '{"request": {"method": "GET", "path": "/api/v1/resource", "headers": {"Authorization": "Bearer valid-token"}, "access": "granted"}}'
      tag: error.msg
      processors:
        logs:
          - name: content_modifier
            action: insert
            key: modified_if_post
            value: true
            condition:
              op: and
              rules:
                - field: "$request['method']"
                  op: eq
                  value: "POST"

          - name: content_modifier
            action: insert
            key: modified_if_get
            value: true
            condition:
              op: and
              rules:
                - field: "$request['method']"
                  op: eq
                  value: "GET"

  outputs:
    - name: stdout
      match: '*'

When this configuration runs, the processor only adds the field modified_if_get to records where the request method is GET:

[0] error.msg: [[1743160978.853138000, {}], {"request"=>{"method"=>"GET", "path"=>"/api/v1/resource", "headers"=>{"Authorization"=>"Bearer valid-token"}, "access"=>"granted"}, "modified_if_get"=>"true"}]
[0] error.msg: [[1743160979.853940000, {}], {"request"=>{"method"=>"GET", "path"=>"/api/v1/resource", "headers"=>{"Authorization"=>"Bearer valid-token"}, "access"=>"granted"}, "modified_if_get"=>"true"}]
  1. More complex
service:
  log_level: info
pipeline:
  inputs:
    - name: dummy
      dummy: '{"request": {"method": "GET", "path": "/api/v1/resource", "headers": {"Authorization": "Bearer valid-token", "Content-Type": "application/json"}, "status_code": 200, "response_time": 150}}'
      tag: request.log
      processors:
        logs:
          - name: content_modifier
            action: insert
            key: high_priority_method
            value: true
            condition:
              op: and
              rules:
                - field: "$request['method']"
                  op: in
                  value: ["POST", "PUT", "DELETE"]

          - name: content_modifier
            action: insert
            key: requires_performance_check
            value: true
            condition:
              op: or
              rules:
                - field: "$request['response_time']"
                  op: gt
                  value: 100
                - field: "$request['status_code']"
                  op: gte
                  value: 400

  outputs:
    - name: stdout

Outputs:

  "headers"=>{"Authorization"=>"Bearer valid-token", "Content-Type"=>"application/json"}, "status_code"=>200, "response_time"=>150},
  "requires_performance_check"=>"true"}]
  [0] request.log: [[1743163088.501242000, {}], {"request"=>{"method"=>"GET", "path"=>"/api/v1/resource", "headers"=>{"Authorization"=>"Bearer valid-token",
  "Content-Type"=>"application/json"}, "status_code"=>200, "response_time"=>150}, "requires_performance_check"=>"true"}]
  [0] request.log: [[1743163089.501156000, {}], {"request"=>{"method"=>"GET", "path"=>"/api/v1/resource", "headers"=>{"Authorization"=>"Bearer valid-token",
  "Content-Type"=>"application/json"}, "status_code"=>200, "response_time"=>150}, "requires_performance_check"=>"true"}]
  1. The high_priority_method field was NOT added to the records because our input has "method":"GET" which isn't in the array ["POST", "PUT", "DELETE"] specified
    in the first condition.

  2. The requires_performance_check field WAS added with value "true" because the second condition is met:

    • We have an OR condition with two rules
    • The first rule checks if response_time > 100 (our value is 150)
    • Since this rule is true, the whole condition evaluates to true, regardless of the second rule
    • This matches what we expected: when response time > 100, flag the request for performance checking

Added tests

Internal unit tests

  • Added test suite for processor condition validation in tests/internal/processor_conditional.c
  • Created tests for condition operators (and, or)
  • Added tests for rule operators (eq, neq, gt, lt, gte, lte, regex, not_regex, in, not_in)
  • Included tests for fields with $ prefix and record accessors
  • Added tests for error cases:
    • Invalid operator validation
    • Empty rules arrays
    • Multiple rules handling
    • Context metadata handling
    • Deeply nested field access
    • Overwriting existing conditions
    • Missing fields/operators/values
    • Invalid rule structures
    • Invalid regex patterns
    • Array values for numeric operators
    • String values for 'in' operator
    • Complex nested conditions

Runtime shell tests

  • Added runtime shell tests in tests/runtime_shell/processor_conditional.sh
  • Tests processor condition functionality with actual Fluent Bit instances

Do not strip $ prefix from field names when parsing conditions. This ensures
that field names with $ prefix are correctly passed to the condition evaluator.
Convert all debug statements to trace level in the conditional evaluation
code to reduce log noise while allowing detailed tracing when needed.
Add 'condition' to the list of special properties that are handled
by the processor and bypass plugin-specific validation.
Convert all debug statements related to condition evaluation to
trace level in the MessagePack processing code.
Add support for conditional filtering in MP chunk objects to allow
processors to selectively process records that match a condition.
Add condition field to processor unit structure to support conditional
processing of records based on their content.
Use the actual string size from the record accessor value instead of
calling flb_sds_len() to prevent potential buffer overflows or crashes.
Change log messages about condition processing from info to debug level
to reduce noise in the logs.
- Declare variables at the beginning of the function
- Change for loop declarations to C89 style
- Fix format specifier for uint64_t
Declare loop variables at the beginning of functions rather than within
the for loops to ensure compatibility with C89 standard required for CentOS 7
builds.
Fixed memory leak in flb_processor_unit_set_condition when handling array
values for the 'in' and 'not_in' operators. The issue was that the cleanup
code relied on rule_val still pointing to the array when it might have been
reassigned during the context check. Added a flag to track array allocations
and fixed all test cleanup to properly free or ignore the condition based on
ownership.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement conditional processing at the processor level for logs
2 participants