Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http over amqp_0_9 proxy #2567

Closed
jandunk opened this issue May 7, 2024 · 1 comment
Closed

http over amqp_0_9 proxy #2567

jandunk opened this issue May 7, 2024 · 1 comment

Comments

@jandunk
Copy link

jandunk commented May 7, 2024

I've been playing around with Benthos a couple of days now and I must say that I love the product. I'm now trying to create a configuration which I can use to sort of proxy HTTP over amqp_0_9. But this seems a bit too challenging given the lack of expierence I have with the product.

Topology:
Client-> Benthos -> RabbitMQ -> Benthos -> Webserver

The idea is that the client is performing a reguest at an http input. That http_input should store the request in an amqp_0_9 with an unique id attached to it. That is then picked up by another Benthos config which will send the request with an http_client output to the webserver. This config is storing the response in another amqp_0_9 queue along with the id. This should then somehow be picked up by the Benthos config which is listening as a webserver and should sync_response the webserver's response from the amqp_0_9 queue which contains the response matching the id added at the input. The config should obviously wait for the response to arrive for some time.

These are my simplified configs, Please ignore the OPTIONS verb, I sort of use that in some decision making.

# inner config
input:
  http_server:
    allowed_verbs: [ GET, POST, OPTIONS ]
    path: /xxx
    sync_response:
      metadata_headers:
        include_patterns:
          - .*
output:
  broker:
    pattern: fan_out
    outputs:
      - stdout:
          codec: "delim:\r\n\r\n"
      - switch:
          cases:
            - check: meta("http_server_verb") == "OPTIONS"
              output:
                resource: options
            - output:
                stdout: {}

output_resources:
  - label: options
    amqp_0_9:
      urls:
        - amqp://user:pass@server.local:5672/
      exchange: "" # No default (required)
      exchange_declare:
        enabled: false
        type: direct
        durable: true
      key: test


# edge config
input:
  amqp_0_9:
    urls:
      - amqp://user:pass@server.local:5672/
    queue: "test" # No default (required)
    queue_declare:
      enabled: false
      durable: true
      auto_delete: false

pipeline:
  processors:
    - http:
        url: "https://webserver:8080/some_service"
        tls:
          enabled: true
          skip_cert_verify: true
        verb: OPTIONS
        extract_headers:
          include_patterns:
            - .*
        successful_on:
          - 200

output:
  amqp_0_9:
    urls:
      - amqp://user:pass@server.local:5672/
    exchange: "" # No default (required)
    exchange_declare:
      enabled: false
      type: direct
      durable: true
    key: test_response

The current config is adding the request to the test queue and will add the response to the test_response queue. It does not check if the response matches the request. This means that requests and responses could get messed up if multiple requests are made to the inner input. Also there is no sync response with the response from the webserver.

Could someone help me to implement the logic for injecting and matching the id and syncing the response from the test_response queue in the inner config? I would already be happy with some pointers in the right direction.

@mihaitodor
Copy link
Collaborator

mihaitodor commented May 9, 2024

Hey @jandunk 👋 Please find some answers inline:

This should then somehow be picked up by the Benthos config which is listening as a webserver and should sync_response the webserver's response from the amqp_0_9 queue which contains the response matching the id added at the input.

I'm not sure I follow. Is this a 3rd Benthos instance? The one you listed above as "inner config" uses amqp_0_9 as the output (and stdout) and there's no sync_response output there.

Could someone help me to implement the logic for injecting and matching the id and syncing the response from the test_response queue in the inner config?

I don't think there's a way right now to a) send a message to an amqp_0_9 output and then b) wait for a response on another amqp_0_9 topic before c) doing a sync_response to the http_server input. I suppose one could hack something together using Benthos in Streams Mode where one stream has the http_server input and reads from a shared cache in a while loop after writing a message over amqp, but I wouldn't want to implement something like that.

The best thing to do is to develop a custom processor plugin based on the existing amqp_0_9 output with some additions such that it can a) write messages to AMQP just like the current output and, also b) block until it receives the expected response on another topic. Then you can use the sync_response output as expected. Since the APIs needed to replicate the sync_response output behaviour are internal, implementing it as a processor is the only reasonable approach currently. Not sure what edge cases you might encounter when considering parallel processing and retries...

LE: Moving this to a discussion as per #2026.

@redpanda-data redpanda-data locked and limited conversation to collaborators May 9, 2024
@mihaitodor mihaitodor converted this issue into discussion #2570 May 9, 2024

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants