Skip to content

Commit 9491310

Browse files
authored
Experimental feature of rows blocks be appended directly to the batch (#1233)
1 parent 22a3351 commit 9491310

File tree

2 files changed

+117
-0
lines changed

2 files changed

+117
-0
lines changed

conn_batch.go

+37
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,13 @@ func (b *batch) Append(v ...any) error {
133133
if b.err != nil {
134134
return b.err
135135
}
136+
137+
if len(v) > 0 {
138+
if r, ok := v[0].(*rows); ok {
139+
return b.appendRowsBlocks(r)
140+
}
141+
}
142+
136143
if err := b.block.Append(v...); err != nil {
137144
b.err = errors.Wrap(ErrBatchInvalid, err.Error())
138145
b.release(err)
@@ -141,6 +148,36 @@ func (b *batch) Append(v ...any) error {
141148
return nil
142149
}
143150

151+
// appendRowsBlocks is an experimental feature that allows rows blocks be appended directly to the batch.
152+
// This API is not stable and may be changed in the future.
153+
// See: tests/batch_block_test.go
154+
func (b *batch) appendRowsBlocks(r *rows) error {
155+
var lastReadLock *proto.Block
156+
var blockNum int
157+
158+
for r.Next() {
159+
if lastReadLock == nil { // make sure the first block is logged
160+
b.conn.debugf("[batch.appendRowsBlocks] blockNum = %d", blockNum)
161+
}
162+
163+
// rows.Next() will read the next block from the server only if the current block is empty
164+
// only if new block is available we should flush the current block
165+
// the last block will be handled by the batch.Send() method
166+
if lastReadLock != nil && lastReadLock != r.block {
167+
if err := b.Flush(); err != nil {
168+
return err
169+
}
170+
blockNum++
171+
b.conn.debugf("[batch.appendRowsBlocks] blockNum = %d", blockNum)
172+
}
173+
174+
b.block = r.block
175+
lastReadLock = r.block
176+
}
177+
178+
return nil
179+
}
180+
144181
func (b *batch) AppendStruct(v any) error {
145182
if b.err != nil {
146183
return b.err

tests/batch_block_test.go

+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Licensed to ClickHouse, Inc. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. ClickHouse, Inc. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package tests
19+
20+
import (
21+
"sync/atomic"
22+
"testing"
23+
24+
"github.com/ClickHouse/clickhouse-go/v2"
25+
"github.com/stretchr/testify/assert"
26+
"github.com/stretchr/testify/require"
27+
"golang.org/x/net/context"
28+
)
29+
30+
// TestBatchAppendRows tests experimental batch rows blocks append feature.
31+
// This API is not stable and may be changed in the future.
32+
func TestBatchAppendRows(t *testing.T) {
33+
te, err := GetTestEnvironment(testSet)
34+
require.NoError(t, err)
35+
blocksRead := atomic.Uint64{}
36+
opts := ClientOptionsFromEnv(te, clickhouse.Settings{})
37+
opts.Debug = true
38+
opts.Debugf = func(format string, args ...interface{}) {
39+
if format == "[batch.appendRowsBlocks] blockNum = %d" {
40+
blocksRead.Store(uint64(args[0].(int))) // store the last block number read from rows
41+
}
42+
}
43+
44+
conn, err := GetConnectionWithOptions(&opts)
45+
require.NoError(t, err)
46+
47+
ctx := context.Background()
48+
49+
// given we have two tables and a million rows in the source table
50+
var tables = []string{"source", "target"}
51+
for _, table := range tables {
52+
require.NoError(t, conn.Exec(context.Background(), "create table if not exists "+table+" (number1 Int, number2 String, number3 Tuple(String, Int), number4 DateTime) engine = MergeTree() order by tuple()"))
53+
defer conn.Exec(context.Background(), "drop table if exists "+table)
54+
}
55+
56+
require.NoError(t, conn.Exec(ctx, "INSERT INTO source SELECT number, 'string', tuple('foo', number), now() FROM system.numbers LIMIT 1000000"))
57+
58+
// when we create a batch with direct data block access 10 times
59+
60+
selectCtx := clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{
61+
"max_block_size": 1000,
62+
}))
63+
64+
sourceRows, err := conn.Query(selectCtx, "SELECT * FROM source")
65+
require.NoError(t, err)
66+
defer sourceRows.Close()
67+
68+
b, err := conn.PrepareBatch(ctx, "INSERT INTO target")
69+
require.NoError(t, err)
70+
require.NoError(t, b.Append(sourceRows))
71+
require.NoError(t, b.Send())
72+
73+
// then we should be able to see the data in the target table
74+
row := conn.QueryRow(ctx, "SELECT count() FROM source")
75+
require.NoError(t, row.Err())
76+
var count uint64
77+
require.NoError(t, row.Scan(&count))
78+
assert.Equal(t, uint64(1000000), count)
79+
assert.Equal(t, uint64(999), blocksRead.Load())
80+
}

0 commit comments

Comments
 (0)