Skip to content

Commit 2af140f

Browse files
committed
Merge remote-tracking branch 'origin/main'
2 parents 71e57b0 + ff98a2c commit 2af140f

File tree

8 files changed

+192
-6
lines changed

8 files changed

+192
-6
lines changed

CHANGELOG.md

+20
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,23 @@
1+
# v2.22.2, 2024-03-18 <!-- Release notes generated using configuration in .github/release.yml at main -->
2+
3+
## What's Changed
4+
### Fixes 🐛
5+
* Fix for Map columns with Enums by @leklund in https://github.com/ClickHouse/clickhouse-go/pull/1236
6+
7+
## New Contributors
8+
* @leklund made their first contribution in https://github.com/ClickHouse/clickhouse-go/pull/1236
9+
10+
**Full Changelog**: https://github.com/ClickHouse/clickhouse-go/compare/v2.22.1...v2.22.2
11+
12+
# v2.22.1, 2024-03-18 <!-- Release notes generated using configuration in .github/release.yml at main -->
13+
14+
## What's Changed
15+
### Fixes 🐛
16+
* Make errors channel buffered inside query() by @threadedstream in https://github.com/ClickHouse/clickhouse-go/pull/1237
17+
18+
19+
**Full Changelog**: https://github.com/ClickHouse/clickhouse-go/compare/v2.22.0...v2.22.1
20+
121
# v2.20.0, 2024-02-28 <!-- Release notes generated using configuration in .github/release.yml at main -->
222

323
## What's Changed

client_info.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ const ClientName = "clickhouse-go"
2929

3030
const (
3131
ClientVersionMajor = 2
32-
ClientVersionMinor = 20
33-
ClientVersionPatch = 0
32+
ClientVersionMinor = 22
33+
ClientVersionPatch = 2
3434
ClientTCPProtocolVersion = proto.DBMS_TCP_PROTOCOL_VERSION
3535
)
3636

conn_batch.go

+37
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,13 @@ func (b *batch) Append(v ...any) error {
133133
if b.err != nil {
134134
return b.err
135135
}
136+
137+
if len(v) > 0 {
138+
if r, ok := v[0].(*rows); ok {
139+
return b.appendRowsBlocks(r)
140+
}
141+
}
142+
136143
if err := b.block.Append(v...); err != nil {
137144
b.err = errors.Wrap(ErrBatchInvalid, err.Error())
138145
b.release(err)
@@ -141,6 +148,36 @@ func (b *batch) Append(v ...any) error {
141148
return nil
142149
}
143150

151+
// appendRowsBlocks is an experimental feature that allows rows blocks be appended directly to the batch.
152+
// This API is not stable and may be changed in the future.
153+
// See: tests/batch_block_test.go
154+
func (b *batch) appendRowsBlocks(r *rows) error {
155+
var lastReadLock *proto.Block
156+
var blockNum int
157+
158+
for r.Next() {
159+
if lastReadLock == nil { // make sure the first block is logged
160+
b.conn.debugf("[batch.appendRowsBlocks] blockNum = %d", blockNum)
161+
}
162+
163+
// rows.Next() will read the next block from the server only if the current block is empty
164+
// only if new block is available we should flush the current block
165+
// the last block will be handled by the batch.Send() method
166+
if lastReadLock != nil && lastReadLock != r.block {
167+
if err := b.Flush(); err != nil {
168+
return err
169+
}
170+
blockNum++
171+
b.conn.debugf("[batch.appendRowsBlocks] blockNum = %d", blockNum)
172+
}
173+
174+
b.block = r.block
175+
lastReadLock = r.block
176+
}
177+
178+
return nil
179+
}
180+
144181
func (b *batch) AppendStruct(v any) error {
145182
if b.err != nil {
146183
return b.err

conn_query.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func (c *connect) query(ctx context.Context, release func(*connect, error), quer
6565
bufferSize = options.blockBufferSize
6666
}
6767
var (
68-
errors = make(chan error)
68+
errors = make(chan error, 1)
6969
stream = make(chan *proto.Block, bufferSize)
7070
)
7171

contributors/list

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
rogeryk <rogeryk@outlook.com>
1+
Lukas Eklund <leklund@gmail.com>

lib/column/map.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,17 @@ func (col *Map) Name() string {
6666

6767
func (col *Map) parse(t Type, tz *time.Location) (_ Interface, err error) {
6868
col.chType = t
69-
if types := strings.SplitN(t.params(), ",", 2); len(types) == 2 {
69+
types := make([]string, 2, 2)
70+
typeParams := t.params()
71+
idx := strings.Index(typeParams, ",")
72+
if strings.HasPrefix(typeParams, "Enum") {
73+
idx = strings.Index(typeParams, "),") + 1
74+
}
75+
if idx > 0 {
76+
types[0] = typeParams[:idx]
77+
types[1] = typeParams[idx+1:]
78+
}
79+
if types[0] != "" && types[1] != "" {
7080
if col.keys, err = Type(strings.TrimSpace(types[0])).Column(col.name, tz); err != nil {
7181
return nil, err
7282
}

tests/batch_block_test.go

+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Licensed to ClickHouse, Inc. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. ClickHouse, Inc. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package tests
19+
20+
import (
21+
"sync/atomic"
22+
"testing"
23+
24+
"github.com/ClickHouse/clickhouse-go/v2"
25+
"github.com/stretchr/testify/assert"
26+
"github.com/stretchr/testify/require"
27+
"golang.org/x/net/context"
28+
)
29+
30+
// TestBatchAppendRows tests experimental batch rows blocks append feature.
31+
// This API is not stable and may be changed in the future.
32+
func TestBatchAppendRows(t *testing.T) {
33+
te, err := GetTestEnvironment(testSet)
34+
require.NoError(t, err)
35+
blocksRead := atomic.Uint64{}
36+
opts := ClientOptionsFromEnv(te, clickhouse.Settings{})
37+
opts.Debug = true
38+
opts.Debugf = func(format string, args ...interface{}) {
39+
if format == "[batch.appendRowsBlocks] blockNum = %d" {
40+
blocksRead.Store(uint64(args[0].(int))) // store the last block number read from rows
41+
}
42+
}
43+
44+
conn, err := GetConnectionWithOptions(&opts)
45+
require.NoError(t, err)
46+
47+
ctx := context.Background()
48+
49+
// given we have two tables and a million rows in the source table
50+
var tables = []string{"source", "target"}
51+
for _, table := range tables {
52+
require.NoError(t, conn.Exec(context.Background(), "create table if not exists "+table+" (number1 Int, number2 String, number3 Tuple(String, Int), number4 DateTime) engine = MergeTree() order by tuple()"))
53+
defer conn.Exec(context.Background(), "drop table if exists "+table)
54+
}
55+
56+
require.NoError(t, conn.Exec(ctx, "INSERT INTO source SELECT number, 'string', tuple('foo', number), now() FROM system.numbers LIMIT 1000000"))
57+
58+
// when we create a batch with direct data block access 10 times
59+
60+
selectCtx := clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{
61+
"max_block_size": 1000,
62+
}))
63+
64+
sourceRows, err := conn.Query(selectCtx, "SELECT * FROM source")
65+
require.NoError(t, err)
66+
defer sourceRows.Close()
67+
68+
b, err := conn.PrepareBatch(ctx, "INSERT INTO target")
69+
require.NoError(t, err)
70+
require.NoError(t, b.Append(sourceRows))
71+
require.NoError(t, b.Send())
72+
73+
// then we should be able to see the data in the target table
74+
row := conn.QueryRow(ctx, "SELECT count() FROM source")
75+
require.NoError(t, row.Err())
76+
var count uint64
77+
require.NoError(t, row.Scan(&count))
78+
assert.Equal(t, uint64(1000000), count)
79+
assert.Equal(t, uint64(999), blocksRead.Load())
80+
}

tests/map_test.go

+40-1
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,9 @@ func TestColumnarMap(t *testing.T) {
122122
Col1 Map(String, UInt64)
123123
, Col2 Map(String, UInt64)
124124
, Col3 Map(String, UInt64)
125+
, Col4 Map(Enum16('one' = 1, 'two' = 2), UInt64)
126+
, Col5 Map(String, Enum16('one' = 1, 'two' = 2))
127+
, Col6 Map(Enum8('one' = 1, 'two' = 2), Enum8('red' = 1, 'blue' = 2))
125128
) Engine MergeTree() ORDER BY tuple()
126129
`
127130
defer func() {
@@ -134,6 +137,9 @@ func TestColumnarMap(t *testing.T) {
134137
col1Data = []map[string]uint64{}
135138
col2Data = []map[string]uint64{}
136139
col3Data = []map[string]uint64{}
140+
col4Data = []map[string]uint64{}
141+
col5Data = []map[string]string{}
142+
col6Data = []map[string]string{}
137143
)
138144
for i := 0; i < 100; i++ {
139145
col1Data = append(col1Data, map[string]uint64{
@@ -145,17 +151,35 @@ func TestColumnarMap(t *testing.T) {
145151
fmt.Sprintf("key_col_2_%d_2", i): uint64(i),
146152
})
147153
col3Data = append(col3Data, map[string]uint64{})
154+
col4Data = append(col4Data, map[string]uint64{
155+
"one": uint64(i),
156+
"two": uint64(i),
157+
})
158+
col5Data = append(col5Data, map[string]string{
159+
fmt.Sprintf("key_col_2_%d_1", i): "one",
160+
fmt.Sprintf("key_col_2_%d_2", i): "two",
161+
})
162+
col6Data = append(col6Data, map[string]string{
163+
"one": "red",
164+
"two": "blue",
165+
})
148166
}
149167
require.NoError(t, batch.Column(0).Append(col1Data))
150168
require.NoError(t, batch.Column(1).Append(col2Data))
151169
require.NoError(t, batch.Column(2).Append(col3Data))
170+
require.NoError(t, batch.Column(3).Append(col4Data))
171+
require.NoError(t, batch.Column(4).Append(col5Data))
172+
require.NoError(t, batch.Column(5).Append(col6Data))
152173
require.Equal(t, 100, batch.Rows())
153174
require.NoError(t, batch.Send())
154175
{
155176
var (
156177
col1 map[string]uint64
157178
col2 map[string]uint64
158179
col3 map[string]uint64
180+
col4 map[string]uint64
181+
col5 map[string]string
182+
col6 map[string]string
159183
col1Data = map[string]uint64{
160184
"key_col_1_10_1": 10,
161185
"key_col_1_10_2": 10,
@@ -165,11 +189,26 @@ func TestColumnarMap(t *testing.T) {
165189
"key_col_2_10_2": 10,
166190
}
167191
col3Data = map[string]uint64{}
192+
col4Data = map[string]uint64{
193+
"one": 10,
194+
"two": 10,
195+
}
196+
col5Data = map[string]string{
197+
"key_col_2_10_1": "one",
198+
"key_col_2_10_2": "two",
199+
}
200+
col6Data = map[string]string{
201+
"one": "red",
202+
"two": "blue",
203+
}
168204
)
169-
require.NoError(t, conn.QueryRow(ctx, "SELECT * FROM test_map WHERE Col1['key_col_1_10_1'] = $1", 10).Scan(&col1, &col2, &col3))
205+
require.NoError(t, conn.QueryRow(ctx, "SELECT * FROM test_map WHERE Col1['key_col_1_10_1'] = $1", 10).Scan(&col1, &col2, &col3, &col4, &col5, &col6))
170206
assert.Equal(t, col1Data, col1)
171207
assert.Equal(t, col2Data, col2)
172208
assert.Equal(t, col3Data, col3)
209+
assert.Equal(t, col4Data, col4)
210+
assert.Equal(t, col5Data, col5)
211+
assert.Equal(t, col6Data, col6)
173212
}
174213
}
175214

0 commit comments

Comments
 (0)