Skip to content

Commit 4c8219e

Browse files
authored
Stream HTTP response body read for decompression (#1213)
* Create reader instead of reading Response.Body to avoid huge memory consumption * Fix test
1 parent 40eda3f commit 4c8219e

File tree

3 files changed

+70
-81
lines changed

3 files changed

+70
-81
lines changed

clickhouse_rows.go

+13-1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ func (r *rows) Next() (result bool) {
4646
}
4747
next:
4848
if r.row >= r.block.Rows() {
49+
if r.stream == nil {
50+
return false
51+
}
4952
select {
5053
case err := <-r.errors:
5154
if err != nil {
@@ -95,7 +98,16 @@ func (r *rows) Columns() []string {
9598
}
9699

97100
func (r *rows) Close() error {
98-
active := 2
101+
if r.errors == nil && r.stream == nil {
102+
return r.err
103+
}
104+
active := 0
105+
if r.errors != nil {
106+
active++
107+
}
108+
if r.stream != nil {
109+
active++
110+
}
99111
for {
100112
select {
101113
case _, ok := <-r.stream:

conn_http.go

+21-45
Original file line numberDiff line numberDiff line change
@@ -75,49 +75,32 @@ type HTTPReaderWriter struct {
7575
method CompressionMethod
7676
}
7777

78-
func (rw HTTPReaderWriter) read(res *http.Response) ([]byte, error) {
78+
// NewReader will return a reader that will decompress data if needed.
79+
func (rw *HTTPReaderWriter) NewReader(res *http.Response) (io.Reader, error) {
7980
enc := res.Header.Get("Content-Encoding")
8081
if !res.Uncompressed && rw.method.String() == enc {
8182
switch rw.method {
8283
case CompressionGZIP:
8384
reader := rw.reader.(*gzip.Reader)
84-
defer reader.Close()
8585
if err := reader.Reset(res.Body); err != nil {
8686
return nil, err
8787
}
88-
body, err := io.ReadAll(reader)
89-
if err != nil {
90-
return nil, err
91-
}
92-
return body, nil
88+
return reader, nil
9389
case CompressionDeflate:
94-
reader := rw.reader.(io.ReadCloser)
95-
defer reader.Close()
96-
if err := rw.reader.(flate.Resetter).Reset(res.Body, nil); err != nil {
97-
return nil, err
98-
}
99-
body, err := io.ReadAll(reader)
100-
if err != nil {
90+
reader := rw.reader
91+
if err := reader.(flate.Resetter).Reset(res.Body, nil); err != nil {
10192
return nil, err
10293
}
103-
return body, nil
94+
return reader, nil
10495
case CompressionBrotli:
10596
reader := rw.reader.(*brotli.Reader)
10697
if err := reader.Reset(res.Body); err != nil {
10798
return nil, err
10899
}
109-
body, err := io.ReadAll(reader)
110-
if err != nil {
111-
return nil, err
112-
}
113-
return body, nil
100+
return reader, nil
114101
}
115102
}
116-
body, err := io.ReadAll(res.Body)
117-
if err != nil {
118-
return nil, err
119-
}
120-
return body, nil
103+
return res.Body, nil
121104
}
122105

123106
func (rw *HTTPReaderWriter) reset(pw *io.PipeWriter) io.WriteCloser {
@@ -436,27 +419,21 @@ func (h *httpConnect) sendQuery(ctx context.Context, query string, options *Quer
436419

437420
func (h *httpConnect) readRawResponse(response *http.Response) (body []byte, err error) {
438421
rw := h.compressionPool.Get()
439-
defer response.Body.Close()
440422
defer h.compressionPool.Put(rw)
441-
if body, err = rw.read(response); err != nil {
423+
424+
reader, err := rw.NewReader(response)
425+
if err != nil {
442426
return nil, err
443427
}
444428
if h.compression == CompressionLZ4 || h.compression == CompressionZSTD {
445-
result := make([]byte, len(body))
446-
reader := chproto.NewReader(bytes.NewReader(body))
447-
reader.EnableCompression()
448-
defer reader.DisableCompression()
449-
for {
450-
b, err := reader.ReadByte()
451-
if err != nil {
452-
if errors.Is(err, io.EOF) {
453-
break
454-
}
455-
return nil, err
456-
}
457-
result = append(result, b)
458-
}
459-
return result, nil
429+
chReader := chproto.NewReader(reader)
430+
chReader.EnableCompression()
431+
reader = chReader
432+
}
433+
434+
body, err = io.ReadAll(reader)
435+
if err != nil {
436+
return nil, err
460437
}
461438
return body, nil
462439
}
@@ -549,14 +526,13 @@ func (h *httpConnect) executeRequest(req *http.Request) (*http.Response, error)
549526
if err != nil {
550527
return nil, err
551528
}
552-
if resp.StatusCode != http.StatusOK {
553529

530+
if resp.StatusCode != http.StatusOK {
531+
defer resp.Body.Close()
554532
msg, err := h.readRawResponse(resp)
555-
556533
if err != nil {
557534
return nil, fmt.Errorf("clickhouse [execute]:: %d code: failed to read the response: %w", resp.StatusCode, err)
558535
}
559-
560536
return nil, fmt.Errorf("clickhouse [execute]:: %d code: %s", resp.StatusCode, string(msg))
561537
}
562538
return resp, nil

conn_http_query.go

+36-35
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@
1818
package clickhouse
1919

2020
import (
21-
"bytes"
2221
"context"
2322
"errors"
23+
"io"
24+
2425
chproto "github.com/ClickHouse/ch-go/proto"
2526
"github.com/ClickHouse/clickhouse-go/v2/lib/proto"
26-
"io"
2727
)
2828

2929
// release is ignored, because http used by std with empty release function
@@ -50,52 +50,48 @@ func (h *httpConnect) query(ctx context.Context, release func(*connect, error),
5050
if err != nil {
5151
return nil, err
5252
}
53-
defer res.Body.Close()
54-
// detect compression from http Content-Encoding header - note user will need to have set enable_http_compression
55-
// for CH to respond with compressed data - we don't set this automatically as they might not have permissions
56-
var body []byte
57-
//adding Accept-Encoding:gzip on your request means response won’t be automatically decompressed per https://github.com/golang/go/blob/master/src/net/http/transport.go#L182-L190
58-
59-
rw := h.compressionPool.Get()
60-
body, err = rw.read(res)
61-
bufferSize := h.blockBufferSize
62-
if options.blockBufferSize > 0 {
63-
// allow block buffer sze to be overridden per query
64-
bufferSize = options.blockBufferSize
65-
}
66-
var (
67-
errCh = make(chan error)
68-
stream = make(chan *proto.Block, bufferSize)
69-
)
7053

71-
if len(body) == 0 {
72-
// queries with no results can get an empty body
73-
go func() {
74-
close(stream)
75-
close(errCh)
76-
}()
54+
if res.ContentLength == 0 {
55+
block := &proto.Block{}
7756
return &rows{
78-
err: nil,
79-
stream: stream,
80-
errors: errCh,
81-
block: &proto.Block{},
82-
columns: []string{},
57+
block: block,
58+
columns: block.ColumnsNames(),
8359
structMap: &structMap{},
8460
}, nil
8561
}
62+
63+
rw := h.compressionPool.Get()
64+
// The HTTPReaderWriter.NewReader will create a reader that will decompress it if needed,
65+
// cause adding Accept-Encoding:gzip on your request means response won’t be automatically decompressed
66+
// per https://github.com/golang/go/blob/master/src/net/http/transport.go#L182-L190.
67+
// Note user will need to have set enable_http_compression for CH to respond with compressed data. we don't set this
68+
// automatically as they might not have permissions.
69+
reader, err := rw.NewReader(res)
8670
if err != nil {
71+
res.Body.Close()
72+
h.compressionPool.Put(rw)
8773
return nil, err
8874
}
89-
h.compressionPool.Put(rw)
90-
reader := chproto.NewReader(bytes.NewReader(body))
91-
block, err := h.readData(ctx, reader)
92-
if err != nil {
75+
chReader := chproto.NewReader(reader)
76+
block, err := h.readData(ctx, chReader)
77+
if err != nil && !errors.Is(err, io.EOF) {
78+
res.Body.Close()
79+
h.compressionPool.Put(rw)
9380
return nil, err
9481
}
9582

83+
bufferSize := h.blockBufferSize
84+
if options.blockBufferSize > 0 {
85+
// allow block buffer sze to be overridden per query
86+
bufferSize = options.blockBufferSize
87+
}
88+
var (
89+
errCh = make(chan error)
90+
stream = make(chan *proto.Block, bufferSize)
91+
)
9692
go func() {
9793
for {
98-
block, err := h.readData(ctx, reader)
94+
block, err := h.readData(ctx, chReader)
9995
if err != nil {
10096
// ch-go wraps EOF errors
10197
if !errors.Is(err, io.EOF) {
@@ -110,10 +106,15 @@ func (h *httpConnect) query(ctx context.Context, release func(*connect, error),
110106
case stream <- block:
111107
}
112108
}
109+
res.Body.Close()
110+
h.compressionPool.Put(rw)
113111
close(stream)
114112
close(errCh)
115113
}()
116114

115+
if block == nil {
116+
block = &proto.Block{}
117+
}
117118
return &rows{
118119
block: block,
119120
stream: stream,

0 commit comments

Comments
 (0)