Skip to content

Commit

Permalink
Merge pull request #19691 from guswynn/hidden-host-test
Browse files Browse the repository at this point in the history
storage: test that we don't resolve remote hosts when using ssh tunnels
  • Loading branch information
benesch authored Jun 3, 2023
2 parents 5db9c2d + e2d8b69 commit 9ab5f83
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 10 deletions.
20 changes: 13 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,7 @@ proptest-derive = { git = "https://github.com/MaterializeInc/proptest.git" }
# Waiting on https://github.com/edenhill/librdkafka/pull/4051.
rdkafka = { git = "https://github.com/MaterializeInc/rust-rdkafka.git" }
rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka.git" }

# Waiting on https://github.com/openssh-rust/openssh/pull/120 to make it into
# a release.
openssh = { git = "https://github.com/MaterializeInc/openssh.git" }
2 changes: 1 addition & 1 deletion src/mz/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ rust-version.workspace = true
anyhow = "1.0.66"
axum = { version = "0.6.7" }
clap = { version = "3.2.24", features = [ "derive" ] }
dirs = "4.0.0"
dirs = "5.0.0"
indicatif = "0.17.2"
mz-build-info = { path = "../build-info" }
mz-ore = { path = "../ore", features = ["async", "cli"] }
Expand Down
4 changes: 2 additions & 2 deletions src/ssh-util/src/tunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ async fn connect(
// Force use of IPv4 loopback. Do not use the hostname `localhost`,
// as that can resolve to IPv6, and the SSH tunnel is only listening
// for IPv4 connections.
let local = openssh::Socket::new(&(Ipv4Addr::LOCALHOST, local_port))?;
let remote = openssh::Socket::new(&(host, port))?;
let local = openssh::Socket::from((Ipv4Addr::LOCALHOST, local_port));
let remote = openssh::Socket::new(host, port);

match session
.request_port_forward(ForwardType::Local, local, remote)
Expand Down
54 changes: 54 additions & 0 deletions test/ssh-connection/hidden-hosts.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

# Test creating a source that uses Kafka/CSR via an SSH tunnel where the Kafka
# broker and CSR server have hostnames that can only be resolved from the
# SSH bastion host.
#
# The `openssh` crate previously had a bug where it would attempt to resolve
# the target host on the client.
# See: https://github.com/openssh-rust/openssh/pull/120

$ kafka-create-topic topic=thetopic

$ kafka-ingest topic=thetopic format=bytes
one

> CREATE CONNECTION kafka_conn
TO KAFKA (BROKER 'hidden-kafka:9092' USING SSH TUNNEL thancred);


> CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY (
URL 'http://hidden-schema-registry:8081',
SSH TUNNEL thancred
);

$ set schema={
"type" : "record",
"name" : "test",
"fields" : [
{"name":"f1", "type":"string"},
{"name":"f2", "type":"long"}
]
}

$ kafka-create-topic topic=avroavro

$ kafka-ingest format=avro topic=avroavro schema=${schema}
{"f1": "fish", "f2": 1000}

> CREATE SOURCE csr_source
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avroavro-${testdrive.seed}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE NONE

> SELECT * FROM csr_source
f1 f2
----------
fish 1000
40 changes: 40 additions & 0 deletions test/ssh-connection/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,45 @@ def workflow_kafka_csr_via_ssh_tunnel(c: Composition, redpanda: bool = False) ->
c.run("testdrive", "--no-reset", "kafka-source-after-ssh-restart.td")


def workflow_hidden_hosts(c: Composition, redpanda: bool = False) -> None:
c.down()
dependencies = ["materialized", "ssh-bastion-host"]
if redpanda:
dependencies += ["redpanda"]
else:
dependencies += ["zookeeper", "kafka", "schema-registry"]
c.up(*dependencies)

c.run("testdrive", "setup.td")

public_key = c.sql_query("select public_key_1 from mz_ssh_tunnel_connections;")[0][
0
]

c.exec(
"ssh-bastion-host",
"bash",
"-c",
f"echo '{public_key}' > /etc/authorized_keys/mz",
)

def add_hidden_host(container: str) -> None:
ip = c.exec(
"ssh-bastion-host", "getent", "hosts", container, capture=True
).stdout.split(" ")[0]
c.exec(
"ssh-bastion-host",
"bash",
"-c",
f"echo '{ip} hidden-{container}' >> /etc/hosts",
)

add_hidden_host("kafka")
add_hidden_host("schema-registry")

c.run("testdrive", "--no-reset", "hidden-hosts.td")


# Test that if we restart the bastion AND change its server keys(s), we can
# still reconnect in the replication stream.
def workflow_pg_restart_bastion(c: Composition) -> None:
Expand Down Expand Up @@ -268,3 +307,4 @@ def workflow_default(c: Composition) -> None:
workflow_pg_via_ssh_tunnel(c)
workflow_pg_via_ssh_tunnel_with_ssl(c)
workflow_pg_restart_bastion(c)
workflow_hidden_hosts(c)

0 comments on commit 9ab5f83

Please sign in to comment.