Skip to content

Commit

Permalink
Merge pull request #2947 from timeplus-io/feature/timeplus-input
Browse files Browse the repository at this point in the history
feat: added timeplus input
  • Loading branch information
Jeffail authored Oct 25, 2024
2 parents 5341963 + 91ab227 commit dc6bbb3
Show file tree
Hide file tree
Showing 11 changed files with 900 additions and 24 deletions.
167 changes: 167 additions & 0 deletions docs/modules/components/pages/inputs/timeplus.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
= timeplus
:type: input
:status: experimental
:categories: ["Services"]



////
THIS FILE IS AUTOGENERATED!

To make changes, edit the corresponding source file under:

https://github.com/redpanda-data/connect/tree/main/internal/impl/<provider>.

And:

https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl
////
// © 2024 Redpanda Data Inc.
component_type_dropdown::[]
Executes a query on Timeplus Enterprise and creates a message from each row received
```yml
# Config fields, showing default values
input:
label: ""
timeplus:
query: select * from iot # No default (required)
url: tcp://localhost:8463
workspace: "" # No default (optional)
apikey: "" # No default (optional)
username: "" # No default (optional)
password: "" # No default (optional)
```
This input can execute a query on Timeplus Enterprise Cloud, Timeplus Enterprise (self-hosted) or Timeplusd. A structured message will be created
from each row received.
If it is a streaming query, this input will keep running until the query is terminated. If it is a table query, this input will shut down once the rows from the query are exhausted.
== Examples
[tabs]
======
From Timeplus Enterprise Cloud via HTTP::
+
--
You will need to create API Key on Timeplus Enterprise Cloud Web console first and then set the `apikey` field.
```yaml
input:
timeplus:
url: https://us-west-2.timeplus.cloud
workspace: my_workspace_id
query: select * from iot
apikey: <Your API Key>```
--
From Timeplus Enterprise (self-hosted) via HTTP::
+
--
For self-housted Timeplus Enterprise, you will need to specify the username and password as well as the URL of the App server
```yaml
input:
timeplus:
url: http://localhost:8000
workspace: my_workspace_id
query: select * from iot
username: username
password: pw```
--
From Timeplus Enterprise (self-hosted) via TCP::
+
--
Make sure the the schema of url is tcp
```yaml
input:
timeplus:
url: tcp://localhost:8463
query: select * from iot
username: timeplus
password: timeplus```
--
======
== Fields
=== `query`
The query to run
*Type*: `string`
```yml
# Examples
query: select * from iot
query: select count(*) from table(iot)
```
=== `url`
The url should always include schema and host.
*Type*: `string`
*Default*: `"tcp://localhost:8463"`
=== `workspace`
ID of the workspace. Required when reads from Timeplus Enterprise.
*Type*: `string`
=== `apikey`
The API key. Required when reads from Timeplus Enterprise Cloud
[CAUTION]
====
This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info].
====
*Type*: `string`
=== `username`
The username. Required when reads from Timeplus Enterprise (self-hosted) or Timeplusd
*Type*: `string`
=== `password`
The password. Required when reads from Timeplus Enterprise (self-hosted) or Timeplusd
[CAUTION]
====
This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info].
====
*Type*: `string`
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ require (
github.com/testcontainers/testcontainers-go/modules/ollama v0.32.0
github.com/testcontainers/testcontainers-go/modules/qdrant v0.32.0
github.com/tetratelabs/wazero v1.7.3
github.com/timeplus-io/proton-go-driver/v2 v2.0.17
github.com/trinodb/trino-go-client v0.315.0
github.com/twmb/franz-go v1.17.1
github.com/twmb/franz-go/pkg/kadm v1.13.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,8 @@ github.com/tetratelabs/wazero v1.7.3/go.mod h1:ytl6Zuh20R/eROuyDaGPkp82O9C/DJfXA
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tilinna/z85 v1.0.0 h1:uqFnJBlD01dosSeo5sK1G1YGbPuwqVHqR+12OJDRjUw=
github.com/tilinna/z85 v1.0.0/go.mod h1:EfpFU/DUY4ddEy6CRvk2l+UQNEzHbh+bqBQS+04Nkxs=
github.com/timeplus-io/proton-go-driver/v2 v2.0.17 h1:rXPT21/9FgQYFntSgLvJRL/7pgPAfTXWIZKp5UG+vQ0=
github.com/timeplus-io/proton-go-driver/v2 v2.0.17/go.mod h1:rUs4zvXvKsmuyFpzdJnnid6p8IvRJTa/n/jNQ2B6Dfw=
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4=
github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0=
Expand Down
158 changes: 158 additions & 0 deletions internal/impl/timeplus/driver/driver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package driver

import (
"context"
"database/sql"
"errors"
"io"
"regexp"
"strconv"
"strings"
"time"

"github.com/redpanda-data/benthos/v4/public/service"
protonDriver "github.com/timeplus-io/proton-go-driver/v2"
)

type driver struct {
logger *service.Logger
conn *sql.DB
rows *sql.Rows
columnTypes []*sql.ColumnType

ctx context.Context
cancel context.CancelFunc
}

var (
codeRe = *regexp.MustCompile(`code: (.+[0-9])`)
msgRe = *regexp.MustCompile(`message: (.*)`)
)

// NewDriver creates a new proton driver.
func NewDriver(logger *service.Logger, addr, username, password string) *driver {
conn := protonDriver.OpenDB(&protonDriver.Options{
Addr: []string{addr},
Auth: protonDriver.Auth{
Username: username,
Password: password,
},
DialTimeout: 5 * time.Second,
})

return &driver{
logger: logger,
conn: conn,
}
}

// Run starts a query.
func (d *driver) Run(sql string) error {
d.ctx, d.cancel = context.WithCancel(context.Background())
ckCtx := protonDriver.Context(d.ctx)

//nolint
rows, err := d.conn.QueryContext(ckCtx, sql)
if err != nil {
return err
}

columnTypes, err := rows.ColumnTypes()
if err != nil {
return err
}

d.rows = rows
d.columnTypes = columnTypes

return nil
}

// Read reads one row.
func (d *driver) Read(ctx context.Context) (map[string]any, error) {
for { // retry loop
if d.rows.Next() {
count := len(d.columnTypes)

values := make([]any, count)
valuePtrs := make([]any, count)

for i := range d.columnTypes {
valuePtrs[i] = &values[i]
}

if err := d.rows.Scan(valuePtrs...); err != nil {
return nil, err
}

event := make(map[string]any)
for i, col := range d.columnTypes {
event[col.Name()] = values[i]
}

return event, nil
}

if err := d.rows.Err(); err != nil {
if isQueryCancelErr(err) {
// Most likely timeplusd got restarted. Since we are going to re-connect to timeplusd once it recovered, we do not log it as error for now.
d.logger.With("reason", err).Info("query cancelled")
return nil, io.EOF
}
if errors.Is(err, context.Canceled) {
return nil, err
}

d.logger.With("error", err).Errorf("query failed: %s", err.Error())
// this happens when the SQL is updated, i.e. a new MV is created, the previous checkpoint is on longer available.
if strings.Contains(err.Error(), "code: 2003") {
continue // retry
}
return nil, err
}

return nil, io.EOF
}
}

// Close terminates the running query.
func (d *driver) Close(context.Context) error {
d.cancel()

if err := d.rows.Close(); err != nil {
if !errors.Is(err, context.Canceled) {
return err
}
}

if err := d.rows.Err(); err != nil {
if !errors.Is(err, context.Canceled) {
return err
}
}

return d.conn.Close()
}

func isQueryCancelErr(err error) bool {
code, msg := parse(err)
return code == 394 && strings.Contains(msg, "Query was cancelled")
}

func parse(err error) (int, string) {
var code int
var msg string

errStr := err.Error()
codeMatches := codeRe.FindStringSubmatch(errStr)
if len(codeMatches) == 2 {
code, _ = strconv.Atoi(codeMatches[1])
}

msgMatches := msgRe.FindStringSubmatch(errStr)
if len(msgMatches) == 2 {
msg = msgMatches[1]
}

return code, msg
}
Loading

0 comments on commit dc6bbb3

Please sign in to comment.