Skip to content

Commit 0fdf24e

Browse files
support streaming insert;
block in streaming_buffer can be replaced by data generated outside.
1 parent 473da51 commit 0fdf24e

File tree

5 files changed

+377
-0
lines changed

5 files changed

+377
-0
lines changed

clickhouse.go

+9
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type (
4040
)
4141

4242
var (
43+
ErrStreamingBufferClosed = errors.New("proton: streaming buffer has already been closed")
4344
ErrBatchAlreadySent = errors.New("proton: batch has already been sent")
4445
ErrAcquireConnTimeout = errors.New("proton: acquire conn timeout. you can increase the number of max open conn or the dial timeout")
4546
ErrUnsupportedServerRevision = errors.New("proton: unsupported server revision")
@@ -146,6 +147,14 @@ func (ch *proton) PrepareBatch(ctx context.Context, query string) (driver.Batch,
146147
return conn.prepareBatch(ctx, query, ch.release)
147148
}
148149

150+
func (ch *proton) PrepareStreamingBuffer(ctx context.Context, query string) (driver.StreamingBuffer, error) {
151+
conn, err := ch.acquire(ctx)
152+
if err != nil {
153+
return nil, err
154+
}
155+
return conn.prepareStreamingBuffer(ctx, query, ch.release)
156+
}
157+
149158
func (ch *proton) AsyncInsert(ctx context.Context, query string, wait bool) error {
150159
conn, err := ch.acquire(ctx)
151160
if err != nil {

conn_streaming_buffer.go

+203
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
package proton
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"log"
8+
"strings"
9+
"sync"
10+
"time"
11+
12+
"github.com/timeplus-io/proton-go-driver/v2/lib/column"
13+
"github.com/timeplus-io/proton-go-driver/v2/lib/driver"
14+
"github.com/timeplus-io/proton-go-driver/v2/lib/proto"
15+
)
16+
17+
func (c *connect) prepareStreamingBuffer(ctx context.Context, query string, release func(*connect, error)) (*streamingBuffer, error) {
18+
query = splitInsertRe.Split(query, -1)[0]
19+
if !strings.HasSuffix(strings.TrimSpace(strings.ToUpper(query)), "VALUES") {
20+
query += " VALUES"
21+
}
22+
options := queryOptions(ctx)
23+
if deadline, ok := ctx.Deadline(); ok {
24+
c.conn.SetDeadline(deadline)
25+
defer c.conn.SetDeadline(time.Time{})
26+
}
27+
if err := c.sendQuery(query, &options); err != nil {
28+
release(c, err)
29+
return nil, err
30+
}
31+
var (
32+
onProcess = options.onProcess()
33+
block, err = c.firstBlock(ctx, onProcess)
34+
)
35+
if err != nil {
36+
release(c, err)
37+
return nil, err
38+
}
39+
return &streamingBuffer{
40+
ctx: ctx,
41+
conn: c,
42+
block: block,
43+
release: func(err error) {
44+
release(c, err)
45+
},
46+
onProcess: onProcess,
47+
done: make(chan struct{}),
48+
}, nil
49+
}
50+
51+
type streamingBuffer struct {
52+
err error
53+
ctx context.Context
54+
conn *connect
55+
sent bool
56+
block *proto.Block
57+
release func(error)
58+
onProcess *onProcess
59+
once sync.Once
60+
done chan struct{}
61+
}
62+
63+
func (b *streamingBuffer) Append(v ...interface{}) error {
64+
if b.sent {
65+
return ErrStreamingBufferClosed
66+
}
67+
if err := b.block.Append(v...); err != nil {
68+
b.release(err)
69+
return err
70+
}
71+
return nil
72+
}
73+
74+
func (b *streamingBuffer) AppendStruct(v interface{}) error {
75+
values, err := b.conn.structMap.Map("AppendStruct", b.block.ColumnsNames(), v, false)
76+
if err != nil {
77+
return err
78+
}
79+
return b.Append(values...)
80+
}
81+
82+
func (b *streamingBuffer) Column(idx int) driver.StreamingBufferColumn {
83+
if len(b.block.Columns) <= idx {
84+
b.release(nil)
85+
return &streamingBufferColumn{
86+
err: &OpError{
87+
Op: "streamingBuffer.Column",
88+
Err: fmt.Errorf("invalid column index %d", idx),
89+
},
90+
}
91+
}
92+
return &streamingBufferColumn{
93+
buffer: b,
94+
column: b.block.Columns[idx],
95+
release: func(err error) {
96+
b.err = err
97+
b.release(err)
98+
},
99+
}
100+
}
101+
102+
func (b *streamingBuffer) Close() (err error) {
103+
defer func() {
104+
b.sent = true
105+
b.release(err)
106+
}()
107+
if err = b.Send(); err != nil {
108+
return err
109+
}
110+
if err = b.conn.sendData(&proto.Block{}, ""); err != nil {
111+
return err
112+
}
113+
if err = b.conn.encoder.Flush(); err != nil {
114+
return err
115+
}
116+
<-b.done
117+
return nil
118+
}
119+
120+
func (b *streamingBuffer) Clear() (err error) {
121+
for i := range b.block.Columns {
122+
b.block.Columns[i], err = b.block.Columns[i].Type().Column()
123+
if err != nil {
124+
return
125+
}
126+
}
127+
return nil
128+
}
129+
130+
func (b *streamingBuffer) Send() (err error) {
131+
if b.sent {
132+
return ErrStreamingBufferClosed
133+
}
134+
if b.err != nil {
135+
return b.err
136+
}
137+
if b.block.Rows() != 0 {
138+
if err = b.conn.sendData(b.block, ""); err != nil {
139+
return err
140+
}
141+
}
142+
if err = b.conn.encoder.Flush(); err != nil {
143+
return err
144+
}
145+
b.once.Do(func() {
146+
go func() {
147+
if err = b.conn.process(b.ctx, b.onProcess); err != nil {
148+
log.Fatal(err)
149+
}
150+
b.done <- struct{}{}
151+
}()
152+
})
153+
if err = b.Clear(); err != nil {
154+
return err
155+
}
156+
return nil
157+
}
158+
159+
func (b *streamingBuffer) ReplaceBy(cols ...column.Interface) (err error) {
160+
if len(b.block.Columns) != len(cols) {
161+
return errors.New(fmt.Sprintf("colomn number is %d, not %d", len(b.block.Columns), len(cols)))
162+
}
163+
for i := 0; i < len(cols); i++ {
164+
if b.block.Columns[i].Type() != cols[i].Type() {
165+
return errors.New(fmt.Sprintf("type of colomn[%d] is %s, not %s", i, b.block.Columns[i].Type(), cols[i].Type()))
166+
}
167+
}
168+
rows := cols[0].Rows()
169+
for i := 1; i < len(cols); i++ {
170+
if rows != cols[i].Rows() {
171+
return errors.New("cols with different length")
172+
}
173+
}
174+
b.block.Columns = cols
175+
return nil
176+
}
177+
178+
type streamingBufferColumn struct {
179+
err error
180+
buffer *streamingBuffer
181+
column column.Interface
182+
release func(error)
183+
}
184+
185+
func (b *streamingBufferColumn) Append(v interface{}) (err error) {
186+
if b.buffer.sent {
187+
return ErrStreamingBufferClosed
188+
}
189+
if b.err != nil {
190+
b.release(b.err)
191+
return b.err
192+
}
193+
if _, err = b.column.Append(v); err != nil {
194+
b.release(err)
195+
return err
196+
}
197+
return nil
198+
}
199+
200+
var (
201+
_ (driver.StreamingBuffer) = (*streamingBuffer)(nil)
202+
_ (driver.StreamingBufferColumn) = (*streamingBufferColumn)(nil)
203+
)
+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/timeplus-io/proton-go-driver/v2"
7+
"github.com/timeplus-io/proton-go-driver/v2/lib/column"
8+
"log"
9+
"time"
10+
)
11+
12+
func BufferReplaceExample() {
13+
var (
14+
ctx = context.Background()
15+
conn, err = proton.Open(&proton.Options{
16+
Addr: []string{"127.0.0.1:8463"},
17+
Auth: proton.Auth{
18+
Database: "default",
19+
Username: "default",
20+
Password: "",
21+
},
22+
//Debug: true,
23+
DialTimeout: time.Second,
24+
MaxOpenConns: 10,
25+
MaxIdleConns: 5,
26+
ConnMaxLifetime: time.Hour,
27+
})
28+
)
29+
ctx = proton.Context(ctx, proton.WithProgress(func(p *proton.Progress) {
30+
fmt.Println("progress: ", p)
31+
}))
32+
if err != nil {
33+
log.Fatal(err)
34+
}
35+
if err := conn.Exec(ctx, `DROP STREAM IF EXISTS example`); err != nil {
36+
log.Fatal(err)
37+
}
38+
err = conn.Exec(ctx, `
39+
CREATE STREAM IF NOT EXISTS example (
40+
Col1 uint64
41+
, Col2 string
42+
)
43+
`)
44+
if err != nil {
45+
log.Fatal(err)
46+
}
47+
const rows = 200_000
48+
var (
49+
col1 column.UInt64 = make([]uint64, rows)
50+
col2 column.String = make([]string, rows)
51+
)
52+
for i := 0; i < rows; i++ {
53+
col1[i] = uint64(i)
54+
col2[i] = fmt.Sprintf("num%03d", i)
55+
}
56+
buffer, err := conn.PrepareStreamingBuffer(ctx, "INSERT INTO example (* except _tp_time)")
57+
err = buffer.ReplaceBy(
58+
&col1,
59+
&col2,
60+
)
61+
if err != nil {
62+
return
63+
}
64+
err = buffer.Send()
65+
if err != nil {
66+
return
67+
}
68+
err = buffer.Close()
69+
if err != nil {
70+
return
71+
}
72+
}
73+
74+
func main() {
75+
BufferReplaceExample()
76+
}
+75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"time"
8+
9+
"github.com/timeplus-io/proton-go-driver/v2"
10+
)
11+
12+
func example() error {
13+
var (
14+
ctx = context.Background()
15+
conn, err = proton.Open(&proton.Options{
16+
Addr: []string{"127.0.0.1:8463"},
17+
Auth: proton.Auth{
18+
Database: "default",
19+
Username: "default",
20+
Password: "",
21+
},
22+
// Debug: true,
23+
DialTimeout: time.Second,
24+
MaxOpenConns: 10,
25+
MaxIdleConns: 5,
26+
ConnMaxLifetime: time.Hour,
27+
})
28+
)
29+
ctx = proton.Context(ctx, proton.WithProgress(func(p *proton.Progress) {
30+
fmt.Println("progress: ", p)
31+
}))
32+
if err != nil {
33+
return err
34+
}
35+
if err := conn.Exec(ctx, `DROP STREAM IF EXISTS example`); err != nil {
36+
return err
37+
}
38+
err = conn.Exec(ctx, `
39+
CREATE STREAM IF NOT EXISTS example (
40+
Col1 uint64
41+
, Col2 string
42+
)
43+
`)
44+
if err != nil {
45+
return err
46+
}
47+
48+
buffer, err := conn.PrepareStreamingBuffer(ctx, "INSERT INTO example (* except _tp_time)")
49+
if err != nil {
50+
return err
51+
}
52+
for i := 0; i < 100; i++ {
53+
for j := 0; j < 100000; j++ {
54+
err := buffer.Append(
55+
uint64(i*100000+j),
56+
fmt.Sprintf("num_%d_%d", j, i),
57+
)
58+
if err != nil {
59+
return err
60+
}
61+
}
62+
if err := buffer.Send(); err != nil {
63+
return err
64+
}
65+
}
66+
return buffer.Close()
67+
}
68+
69+
func main() {
70+
start := time.Now()
71+
if err := example(); err != nil {
72+
log.Fatal(err)
73+
}
74+
fmt.Println(time.Since(start))
75+
}

0 commit comments

Comments
 (0)