Skip to content

Commit

Permalink
chore(schema): simple sink schema support
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
  • Loading branch information
ngjaying committed Jan 13, 2025
1 parent 403dd18 commit a796124
Show file tree
Hide file tree
Showing 10 changed files with 89 additions and 31 deletions.
10 changes: 5 additions & 5 deletions internal/converter/converter.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2024 EMQ Technologies Co., Ltd.
// Copyright 2022-2025 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,7 +43,7 @@ func init() {
modules.RegisterConverter(message.FormatUrlEncoded, func(_ api.StreamContext, _ string, _ map[string]*ast.JsonStreamField, props map[string]any) (message.Converter, error) {
return urlencoded.NewConverter(props)
})
modules.RegisterWriterConverter(message.FormatDelimited, func(ctx api.StreamContext, avscPath string, props map[string]any) (message.ConvertWriter, error) {
modules.RegisterWriterConverter(message.FormatDelimited, func(ctx api.StreamContext, avscPath string, _ map[string]*ast.JsonStreamField, props map[string]any) (message.ConvertWriter, error) {
return delimited.NewCsvWriter(ctx, props)
})
}
Expand All @@ -65,11 +65,11 @@ func GetOrCreateConverter(ctx api.StreamContext, format string, schemaId string,
return nil, fmt.Errorf("format type %s not supported", t)
}

func GetConvertWriter(ctx api.StreamContext, format string, schemaId string, props map[string]any) (message.ConvertWriter, error) {
func GetConvertWriter(ctx api.StreamContext, format string, schemaId string, schema map[string]*ast.JsonStreamField, props map[string]any) (message.ConvertWriter, error) {
if cw, ok := modules.ConvertWriters[format]; ok {
return cw(ctx, schemaId, props)
return cw(ctx, schemaId, schema, props)
}
c, err := GetOrCreateConverter(ctx, format, schemaId, nil, props)
c, err := GetOrCreateConverter(ctx, format, schemaId, schema, props)
if err != nil {
return nil, err
}
Expand Down
20 changes: 19 additions & 1 deletion internal/server/rule_manager.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
// Copyright 2021-2025 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,8 @@ package server
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -208,6 +210,7 @@ func (rr *RuleRegistry) DeleteRule(name string) error {
}
deleteRuleMetrics(name)
}
deleteRuleData(name)
return err
}

Expand Down Expand Up @@ -483,3 +486,18 @@ func deleteRuleMetrics(name string) {
promMetrics.RemoveRuleStatus(name)
}
}

func deleteRuleData(name string) {
dataLoc, err := conf.GetDataLoc()
if err != nil {
conf.Log.Errorf("delete rule data error: %v", err)
return
}
ruleDataPath := filepath.Join(dataLoc, "rule_"+name)
err = os.RemoveAll(ruleDataPath)
if err != nil {
conf.Log.Errorf("delete rule data error: %v", err)
} else {
conf.Log.Infof("delete rule data: %s", ruleDataPath)
}
}
26 changes: 25 additions & 1 deletion internal/topo/context/default.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2024 EMQ Technologies Co., Ltd.
// Copyright 2022-2025 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -252,6 +252,30 @@ func (c *DefaultContext) WithInstance(instanceId int) api.StreamContext {
}
}

func (c *DefaultContext) WithRuleId(ruleId string) api.StreamContext {
return &DefaultContext{
instanceId: c.instanceId,
ruleId: ruleId,
opId: c.opId,
ctx: c.ctx,
state: c.state,
isTraceEnabled: c.isTraceEnabled,
strategy: c.strategy,
}
}

func (c *DefaultContext) WithOpId(opId string) api.StreamContext {
return &DefaultContext{
instanceId: c.instanceId,
ruleId: c.ruleId,
opId: opId,
ctx: c.ctx,
state: c.state,
isTraceEnabled: c.isTraceEnabled,
strategy: c.strategy,
}
}

func (c *DefaultContext) WithCancel() (api.StreamContext, context.CancelFunc) {
ctx, cancel := context.WithCancel(c.ctx)
return &DefaultContext{
Expand Down
11 changes: 7 additions & 4 deletions internal/topo/node/batch_writer_op.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
// Copyright 2024-2025 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -22,7 +22,9 @@ import (

"github.com/lf-edge/ekuiper/v2/internal/converter"
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
"github.com/lf-edge/ekuiper/v2/internal/topo/context"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/ast"
"github.com/lf-edge/ekuiper/v2/pkg/infra"
"github.com/lf-edge/ekuiper/v2/pkg/message"
)
Expand All @@ -38,12 +40,13 @@ type BatchWriterOp struct {
lastRow any
}

func NewBatchWriterOp(ctx api.StreamContext, name string, rOpt *def.RuleOption, sc *SinkConf) (*BatchWriterOp, error) {
c, err := converter.GetConvertWriter(ctx, sc.Format, sc.SchemaId, nil)
func NewBatchWriterOp(ctx api.StreamContext, name string, rOpt *def.RuleOption, schema map[string]*ast.JsonStreamField, sc *SinkConf) (*BatchWriterOp, error) {
nctx := ctx.(*context.DefaultContext).WithOpId(name)
c, err := converter.GetConvertWriter(nctx, sc.Format, sc.SchemaId, schema, nil)
if err != nil {
return nil, err
}
err = c.New(ctx)
err = c.New(nctx)
if err != nil {
return nil, fmt.Errorf("writer fail to initialize new file: %s", err)
}
Expand Down
19 changes: 15 additions & 4 deletions internal/topo/planner/planner.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2024 EMQ Technologies Co., Ltd.
// Copyright 2022-2025 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -76,13 +76,23 @@ func PlanSQLWithSourcesAndSinks(rule *def.Rule, mockSourcesProp map[string]map[s
if err != nil {
return nil, err
}
tp, err := createTopo(rule, lp, mockSourcesProp, streamsFromStmt)
tp, err := createTopo(rule, lp, mockSourcesProp, streamsFromStmt, getSinkSchema(stmt))
if err != nil {
return nil, err
}
return tp, nil
}

func getSinkSchema(stmt *ast.SelectStatement) map[string]*ast.JsonStreamField {
schema := make(map[string]*ast.JsonStreamField, len(stmt.Fields))
for _, field := range stmt.Fields {
if field.GetName() != "*" {
schema[field.GetName()] = nil
}
}
return schema
}

func validateStmt(stmt *ast.SelectStatement) error {
var vErr error
ast.WalkFunc(stmt, func(n ast.Node) bool {
Expand All @@ -97,7 +107,7 @@ func validateStmt(stmt *ast.SelectStatement) error {
return vErr
}

func createTopo(rule *def.Rule, lp LogicalPlan, mockSourcesProp map[string]map[string]any, streamsFromStmt []string) (t *topo.Topo, err error) {
func createTopo(rule *def.Rule, lp LogicalPlan, mockSourcesProp map[string]map[string]any, streamsFromStmt []string, schema map[string]*ast.JsonStreamField) (t *topo.Topo, err error) {
defer func() {
if err != nil {
err = errorx.NewWithCode(errorx.ExecutorError, err.Error())
Expand All @@ -117,7 +127,7 @@ func createTopo(rule *def.Rule, lp LogicalPlan, mockSourcesProp map[string]map[s
}
inputs := []node.Emitter{input}
// Add actions
err = buildActions(tp, rule, inputs, len(streamsFromStmt))
err = buildActions(tp, rule, inputs, len(streamsFromStmt), schema)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -187,6 +197,7 @@ func ExplainFromLogicalPlan(lp LogicalPlan, ruleID string) (string, error) {
return res, nil
}

// return the last schema if there are multiple sources
func buildOps(lp LogicalPlan, tp *topo.Topo, options *def.RuleOption, sources map[string]map[string]any, streamsFromStmt []string, index int) (node.Emitter, int, error) {
var inputs []node.Emitter
newIndex := index
Expand Down
4 changes: 2 additions & 2 deletions internal/topo/planner/planner_graph.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2024 EMQ Technologies Co., Ltd.
// Copyright 2022-2025 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -109,7 +109,7 @@ func PlanByGraph(rule *def.Rule) (*topo.Topo, error) {
if _, ok := ruleGraph.Topo.Edges[nodeName]; ok {
return nil, fmt.Errorf("sink %s has edge", nodeName)
}
cn, err := SinkToComp(tp, gn.NodeType, nodeName, gn.Props, rule, len(sourceNames))
cn, err := SinkToComp(tp, gn.NodeType, nodeName, gn.Props, rule, len(sourceNames), nil)
if err != nil {
return nil, err
}
Expand Down
15 changes: 8 additions & 7 deletions internal/topo/planner/planner_sink.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
// Copyright 2024-2025 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -26,13 +26,14 @@ import (
"github.com/lf-edge/ekuiper/v2/internal/topo"
"github.com/lf-edge/ekuiper/v2/internal/topo/node"
"github.com/lf-edge/ekuiper/v2/internal/topo/node/conf"
"github.com/lf-edge/ekuiper/v2/pkg/ast"
"github.com/lf-edge/ekuiper/v2/pkg/model"
)

// SinkPlanner is the planner for sink node. It transforms logical sink plan to multiple physical nodes.
// It will split the sink plan into multiple sink nodes according to its sink configurations.

func buildActions(tp *topo.Topo, rule *def.Rule, inputs []node.Emitter, streamCount int) error {
func buildActions(tp *topo.Topo, rule *def.Rule, inputs []node.Emitter, streamCount int, schema map[string]*ast.JsonStreamField) error {
for i, m := range rule.Actions {
for name, action := range m {
props, ok := action.(map[string]any)
Expand All @@ -44,7 +45,7 @@ func buildActions(tp *topo.Topo, rule *def.Rule, inputs []node.Emitter, streamCo
return err
}
sinkName := fmt.Sprintf("%s_%d", name, i)
cn, err := SinkToComp(tp, name, sinkName, props, rule, streamCount)
cn, err := SinkToComp(tp, name, sinkName, props, rule, streamCount, schema)
if err != nil {
return err
}
Expand Down Expand Up @@ -75,7 +76,7 @@ func PlanSinkOps(tp *topo.Topo, inputs []node.Emitter, cn node.CompNode) {
}
}

func SinkToComp(tp *topo.Topo, sinkType string, sinkName string, props map[string]any, rule *def.Rule, streamCount int) (node.CompNode, error) {
func SinkToComp(tp *topo.Topo, sinkType string, sinkName string, props map[string]any, rule *def.Rule, streamCount int, schema map[string]*ast.JsonStreamField) (node.CompNode, error) {
s, _ := io.Sink(sinkType)
if s == nil {
return nil, fmt.Errorf("sink %s is not defined", sinkType)
Expand All @@ -86,7 +87,7 @@ func SinkToComp(tp *topo.Topo, sinkType string, sinkName string, props map[strin
}
templates := findTemplateProps(props)
// Split sink node
sinkOps, err := splitSink(tp, s, sinkName, rule.Options, commonConf, templates)
sinkOps, err := splitSink(tp, s, sinkName, rule.Options, commonConf, templates, schema)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -167,7 +168,7 @@ func findTemplateProps(props map[string]any) []string {
}

// Split sink node according to the sink configuration. Return the new input emitters.
func splitSink(tp *topo.Topo, s api.Sink, sinkName string, options *def.RuleOption, sc *node.SinkConf, templates []string) ([]node.TopNode, error) {
func splitSink(tp *topo.Topo, s api.Sink, sinkName string, options *def.RuleOption, sc *node.SinkConf, templates []string, schema map[string]*ast.JsonStreamField) ([]node.TopNode, error) {
index := 0
result := make([]node.TopNode, 0)
batchEnabled := sc.BatchSize > 0 || sc.LingerInterval > 0
Expand All @@ -189,7 +190,7 @@ func splitSink(tp *topo.Topo, s api.Sink, sinkName string, options *def.RuleOpti
index++
result = append(result, transformOp)
if batchEnabled {
batchWriterOp, err := node.NewBatchWriterOp(tp.GetContext(), fmt.Sprintf("%s_%d_batchWriter", sinkName, index), options, sc)
batchWriterOp, err := node.NewBatchWriterOp(tp.GetContext(), fmt.Sprintf("%s_%d_batchWriter", sinkName, index), options, schema, sc)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions internal/topo/planner/planner_sink_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
// Copyright 2024-2025 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -257,7 +257,7 @@ func TestSinkPlan(t *testing.T) {
assert.NoError(t, err)
tp.AddSrc(n)
inputs := []node.Emitter{n}
err = buildActions(tp, c.rule, inputs, 1)
err = buildActions(tp, c.rule, inputs, 1, nil)
assert.NoError(t, err)
assert.Equal(t, c.topo, tp.GetTopo())
})
Expand Down Expand Up @@ -348,7 +348,7 @@ func TestSinkPlanError(t *testing.T) {
assert.NoError(t, err)
tp.AddSrc(n)
inputs := []node.Emitter{n}
err = buildActions(tp, c.rule, inputs, 1)
err = buildActions(tp, c.rule, inputs, 1, nil)
assert.Error(t, err)
assert.Equal(t, c.err, err.Error())
})
Expand Down
5 changes: 3 additions & 2 deletions internal/topo/topo.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
// Copyright 2021-2025 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -227,7 +227,8 @@ func (s *Topo) prepareContext() {
ctx := kctx.WithValue(kctx.RuleBackground(s.name), kctx.LoggerKey, contextLogger)
ctx = kctx.WithValue(ctx, kctx.RuleStartKey, timex.GetNowInMilli())
ctx = kctx.WithValue(ctx, kctx.RuleWaitGroupKey, s.opsWg)
s.ctx, s.cancel = ctx.WithCancel()
nctx := ctx.WithRuleId(s.name)
s.ctx, s.cancel = nctx.WithCancel()
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/modules/converter.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
// Copyright 2024-2025 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,7 +42,7 @@ func IsFormatSupported(format string) bool {
// ConvertWriters are sink converter to use together with batch
var ConvertWriters = map[string]ConvertWriterProvider{}

type ConvertWriterProvider func(ctx api.StreamContext, schemaId string, props map[string]any) (message.ConvertWriter, error)
type ConvertWriterProvider func(ctx api.StreamContext, schemaId string, logicalSchema map[string]*ast.JsonStreamField, props map[string]any) (message.ConvertWriter, error)

func RegisterWriterConverter(name string, provider ConvertWriterProvider) {
ConvertWriters[name] = provider
Expand Down

0 comments on commit a796124

Please sign in to comment.