Skip to content

Commit 02efdee

Browse files
Retry on broken pipe in batch (#1423)
* Release connections on error during Flush() * Add test to illustrate broken batch flushes * Use a dedicated test environment for broken connection test recover #1421 --------- Co-authored-by: Robert Gettys <rgettys@tesla.com>
1 parent 269b0f3 commit 02efdee

File tree

3 files changed

+79
-5
lines changed

3 files changed

+79
-5
lines changed

conn_batch.go

+5
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"os"
2424
"regexp"
2525
"slices"
26+
"syscall"
2627
"time"
2728

2829
"github.com/pkg/errors"
@@ -286,6 +287,10 @@ func (b *batch) Flush() error {
286287
}
287288
if b.block.Rows() != 0 {
288289
if err := b.conn.sendData(b.block, ""); err != nil {
290+
// broken pipe/conn reset aren't generally recoverable on retry
291+
if errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET) {
292+
b.release(err)
293+
}
289294
return err
290295
}
291296
if b.closeOnFlush {

tests/issues/1421_test.go

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package issues
2+
3+
import (
4+
"context"
5+
"errors"
6+
"os"
7+
"syscall"
8+
"testing"
9+
10+
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
11+
"github.com/ClickHouse/clickhouse-go/v2/tests"
12+
"github.com/docker/docker/api/types/container"
13+
"github.com/stretchr/testify/require"
14+
"github.com/testcontainers/testcontainers-go"
15+
)
16+
17+
//goland:noinspection ALL
18+
const insertQry = "INSERT INTO test (foo, foo2)"
19+
20+
func Test1421BatchFlushBrokenConn(t *testing.T) {
21+
// create a dedicated test environment for this test
22+
// note: test environment management is a bit messy, consider refactoring
23+
env, err := tests.CreateClickHouseTestEnvironment(t.Name())
24+
tests.SetTestEnvironment(t.Name(), env)
25+
require.NoError(t, tests.CreateDatabase(t.Name()))
26+
27+
require.NoError(t, err)
28+
require.NotNil(t, env)
29+
ctx := context.Background()
30+
client, err := testcontainers.NewDockerClientWithOpts(ctx)
31+
require.NoError(t, err)
32+
chClient, err := tests.TestClientWithDefaultSettings(env)
33+
34+
err = chClient.Exec(ctx, "CREATE TABLE test (foo String, foo2 String) ENGINE = MergeTree ORDER BY (foo)")
35+
require.NoError(t, err)
36+
batch, err := chClient.PrepareBatch(ctx, insertQry, driver.WithCloseOnFlush())
37+
require.NoError(t, err)
38+
err = batch.Append("bar", "bar")
39+
require.NoError(t, err)
40+
err = batch.Flush()
41+
require.NoError(t, err)
42+
err = batch.Append("bar2", "bar2")
43+
require.NoError(t, err)
44+
err = batch.Flush()
45+
require.NoError(t, err)
46+
47+
err = batch.Append(RandAsciiString(200000000), RandAsciiString(20000000))
48+
49+
require.NoError(t, err)
50+
ch := make(chan struct{})
51+
go func() {
52+
err = batch.Flush()
53+
close(ch)
54+
}()
55+
//timeout := 0
56+
err2 := client.ContainerKill(ctx, env.ContainerID, "KILL")
57+
<-ch
58+
require.NoError(t, err2)
59+
require.True(t, errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET))
60+
err = client.ContainerStart(ctx, env.ContainerID, container.StartOptions{})
61+
require.NoError(t, err)
62+
err = batch.Flush()
63+
// retry after server is up should have either no error, or a reconnect error (for example because the mapped port
64+
// changed on container startup)
65+
require.True(t, err == nil || errors.Is(err, syscall.ECONNREFUSED) || os.IsTimeout(err), err)
66+
67+
}

tests/utils.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ func GetClickHouseTestVersion() string {
6161
}
6262

6363
type ClickHouseTestEnvironment struct {
64+
ContainerID string
6465
Port int
6566
HttpPort int
6667
SslPort int
@@ -203,11 +204,12 @@ func CreateClickHouseTestEnvironment(testSet string) (ClickHouseTestEnvironment,
203204
hps, _ := clickhouseContainer.MappedPort(ctx, "8443")
204205
ip, _ := clickhouseContainer.ContainerIP(ctx)
205206
testEnv := ClickHouseTestEnvironment{
206-
Port: p.Int(),
207-
HttpPort: hp.Int(),
208-
SslPort: sslPort.Int(),
209-
HttpsPort: hps.Int(),
210-
Host: "127.0.0.1",
207+
ContainerID: clickhouseContainer.GetContainerID(),
208+
Port: p.Int(),
209+
HttpPort: hp.Int(),
210+
SslPort: sslPort.Int(),
211+
HttpsPort: hps.Int(),
212+
Host: "127.0.0.1",
211213
// we set this explicitly - note its also set in the /etc/clickhouse-server/users.d/admin.xml
212214
Username: "default",
213215
Password: "ClickHouse",

0 commit comments

Comments
 (0)