Skip to content

Commit e7384c5

Browse files
committed
add logical plan distributed optimizer to query frontend
Signed-off-by: rubywtl <rubyharrisonlee@gmail.com>
1 parent 52b9672 commit e7384c5

15 files changed

+622
-10
lines changed

pkg/api/queryapi/query_api.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ import (
1616
"github.com/prometheus/prometheus/util/annotations"
1717
"github.com/prometheus/prometheus/util/httputil"
1818
v1 "github.com/prometheus/prometheus/web/api/v1"
19-
"github.com/thanos-io/promql-engine/logicalplan"
2019
"github.com/weaveworks/common/httpgrpc"
2120

21+
"github.com/cortexproject/cortex/pkg/distributed_execution"
2222
"github.com/cortexproject/cortex/pkg/engine"
2323
"github.com/cortexproject/cortex/pkg/querier"
2424
"github.com/cortexproject/cortex/pkg/util"
@@ -110,7 +110,7 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
110110

111111
byteLP := []byte(r.PostFormValue("plan"))
112112
if len(byteLP) != 0 {
113-
logicalPlan, err := logicalplan.Unmarshal(byteLP)
113+
logicalPlan, err := distributed_execution.Unmarshal(byteLP)
114114
if err != nil {
115115
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil}
116116
}
@@ -183,7 +183,7 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
183183

184184
byteLP := []byte(r.PostFormValue("plan"))
185185
if len(byteLP) != 0 {
186-
logicalPlan, err := logicalplan.Unmarshal(byteLP)
186+
logicalPlan, err := distributed_execution.Unmarshal(byteLP)
187187
if err != nil {
188188
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil}
189189
}

pkg/cortex/modules.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
541541
t.Cfg.Querier.LookbackDelta,
542542
t.Cfg.Querier.DefaultEvaluationInterval,
543543
t.Cfg.Querier.DistributedExecEnabled,
544+
t.Cfg.Querier.ThanosEngine.LogicalOptimizers,
544545
)
545546
if err != nil {
546547
return nil, err
@@ -553,7 +554,8 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
553554
queryAnalyzer,
554555
t.Cfg.Querier.LookbackDelta,
555556
t.Cfg.Querier.DefaultEvaluationInterval,
556-
t.Cfg.Querier.DistributedExecEnabled)
557+
t.Cfg.Querier.DistributedExecEnabled,
558+
t.Cfg.Querier.ThanosEngine.LogicalOptimizers)
557559
if err != nil {
558560
return nil, err
559561
}

pkg/distributed_execution/codec.go

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
package distributed_execution
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"math"
7+
8+
"github.com/prometheus/prometheus/model/labels"
9+
"github.com/thanos-io/promql-engine/logicalplan"
10+
)
11+
12+
type jsonNode struct {
13+
Type logicalplan.NodeType `json:"type"`
14+
Data json.RawMessage `json:"data"`
15+
Children []json.RawMessage `json:"children,omitempty"`
16+
}
17+
18+
const (
19+
nanVal = `"NaN"`
20+
infVal = `"+Inf"`
21+
negInfVal = `"-Inf"`
22+
)
23+
24+
// Unmarshal deserializes a logical plan node from JSON data.
25+
// This is a custom implementation for Cortex that is copied from Thanos engine's unmarshaling func
26+
// to support remote nodes. We maintain this separate implementation because Thanos engine's
27+
// logical plan codec currently doesn't support custom node types in its unmarshaling process.
28+
func Unmarshal(data []byte) (logicalplan.Node, error) {
29+
return unmarshalNode(data)
30+
}
31+
32+
func unmarshalNode(data []byte) (logicalplan.Node, error) {
33+
t := jsonNode{}
34+
if err := json.Unmarshal(data, &t); err != nil {
35+
return nil, err
36+
}
37+
38+
switch t.Type {
39+
case logicalplan.VectorSelectorNode:
40+
v := &logicalplan.VectorSelector{}
41+
if err := json.Unmarshal(t.Data, v); err != nil {
42+
return nil, err
43+
}
44+
var err error
45+
for i, m := range v.LabelMatchers {
46+
v.LabelMatchers[i], err = labels.NewMatcher(m.Type, m.Name, m.Value)
47+
if err != nil {
48+
return nil, err
49+
}
50+
}
51+
return v, nil
52+
case logicalplan.MatrixSelectorNode:
53+
m := &logicalplan.MatrixSelector{}
54+
if err := json.Unmarshal(t.Data, m); err != nil {
55+
return nil, err
56+
}
57+
vs, err := unmarshalNode(t.Children[0])
58+
if err != nil {
59+
return nil, err
60+
}
61+
m.VectorSelector = vs.(*logicalplan.VectorSelector)
62+
return m, nil
63+
case logicalplan.AggregationNode:
64+
a := &logicalplan.Aggregation{}
65+
if err := json.Unmarshal(t.Data, a); err != nil {
66+
return nil, err
67+
}
68+
var err error
69+
a.Expr, err = unmarshalNode(t.Children[0])
70+
if err != nil {
71+
return nil, err
72+
}
73+
if len(t.Children) > 1 {
74+
a.Param, err = unmarshalNode(t.Children[1])
75+
if err != nil {
76+
return nil, err
77+
}
78+
}
79+
return a, nil
80+
case logicalplan.BinaryNode:
81+
b := &logicalplan.Binary{}
82+
if err := json.Unmarshal(t.Data, b); err != nil {
83+
return nil, err
84+
}
85+
var err error
86+
b.LHS, err = unmarshalNode(t.Children[0])
87+
if err != nil {
88+
return nil, err
89+
}
90+
b.RHS, err = unmarshalNode(t.Children[1])
91+
if err != nil {
92+
return nil, err
93+
}
94+
return b, nil
95+
case logicalplan.FunctionNode:
96+
f := &logicalplan.FunctionCall{}
97+
if err := json.Unmarshal(t.Data, f); err != nil {
98+
return nil, err
99+
}
100+
for _, c := range t.Children {
101+
child, err := unmarshalNode(c)
102+
if err != nil {
103+
return nil, err
104+
}
105+
f.Args = append(f.Args, child)
106+
}
107+
return f, nil
108+
case logicalplan.NumberLiteralNode:
109+
n := &logicalplan.NumberLiteral{}
110+
if bytes.Equal(t.Data, []byte(infVal)) {
111+
n.Val = math.Inf(1)
112+
} else if bytes.Equal(t.Data, []byte(negInfVal)) {
113+
n.Val = math.Inf(-1)
114+
} else if bytes.Equal(t.Data, []byte(nanVal)) {
115+
n.Val = math.NaN()
116+
} else {
117+
if err := json.Unmarshal(t.Data, n); err != nil {
118+
return nil, err
119+
}
120+
}
121+
return n, nil
122+
case logicalplan.StringLiteralNode:
123+
s := &logicalplan.StringLiteral{}
124+
if err := json.Unmarshal(t.Data, s); err != nil {
125+
return nil, err
126+
}
127+
return s, nil
128+
case logicalplan.SubqueryNode:
129+
s := &logicalplan.Subquery{}
130+
if err := json.Unmarshal(t.Data, s); err != nil {
131+
return nil, err
132+
}
133+
var err error
134+
s.Expr, err = unmarshalNode(t.Children[0])
135+
if err != nil {
136+
return nil, err
137+
}
138+
return s, nil
139+
case logicalplan.CheckDuplicateNode:
140+
c := &logicalplan.CheckDuplicateLabels{}
141+
if err := json.Unmarshal(t.Data, c); err != nil {
142+
return nil, err
143+
}
144+
var err error
145+
c.Expr, err = unmarshalNode(t.Children[0])
146+
if err != nil {
147+
return nil, err
148+
}
149+
return c, nil
150+
case logicalplan.StepInvariantNode:
151+
s := &logicalplan.StepInvariantExpr{}
152+
if err := json.Unmarshal(t.Data, s); err != nil {
153+
return nil, err
154+
}
155+
var err error
156+
s.Expr, err = unmarshalNode(t.Children[0])
157+
if err != nil {
158+
return nil, err
159+
}
160+
return s, nil
161+
case logicalplan.ParensNode:
162+
p := &logicalplan.Parens{}
163+
if err := json.Unmarshal(t.Data, p); err != nil {
164+
return nil, err
165+
}
166+
var err error
167+
p.Expr, err = unmarshalNode(t.Children[0])
168+
if err != nil {
169+
return nil, err
170+
}
171+
return p, nil
172+
case logicalplan.UnaryNode:
173+
u := &logicalplan.Unary{}
174+
if err := json.Unmarshal(t.Data, u); err != nil {
175+
return nil, err
176+
}
177+
var err error
178+
u.Expr, err = unmarshalNode(t.Children[0])
179+
if err != nil {
180+
return nil, err
181+
}
182+
return u, nil
183+
case RemoteNode:
184+
r := &Remote{}
185+
if err := json.Unmarshal(t.Data, r); err != nil {
186+
return nil, err
187+
}
188+
var err error
189+
r.Expr, err = unmarshalNode(t.Children[0])
190+
if err != nil {
191+
return nil, err
192+
}
193+
return r, nil
194+
}
195+
return nil, nil
196+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package distributed_execution
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/require"
8+
"github.com/thanos-io/promql-engine/logicalplan"
9+
)
10+
11+
func TestUnmarshalWithLogicalPlan(t *testing.T) {
12+
t.Run("unmarshal complex query plan", func(t *testing.T) {
13+
start := time.Now()
14+
end := start.Add(1 * time.Hour)
15+
step := 15 * time.Second
16+
17+
testCases := []struct {
18+
name string
19+
query string
20+
}{
21+
{
22+
name: "binary operation",
23+
query: "http_requests_total + rate(node_cpu_seconds_total[5m])",
24+
},
25+
{
26+
name: "aggregation",
27+
query: "sum(rate(http_requests_total[5m])) by (job)",
28+
},
29+
{
30+
name: "complex query",
31+
query: "sum(rate(http_requests_total{job='prometheus'}[5m])) by (job) / sum(rate(node_cpu_seconds_total[5m])) by (job)",
32+
},
33+
}
34+
35+
for _, tc := range testCases {
36+
t.Run(tc.name, func(t *testing.T) {
37+
plan, _, err := CreateTestLogicalPlan(tc.query, start, end, step)
38+
require.NoError(t, err)
39+
require.NotNil(t, plan)
40+
41+
data, err := logicalplan.Marshal((*plan).Root())
42+
require.NoError(t, err)
43+
44+
node, err := Unmarshal(data)
45+
require.NoError(t, err)
46+
require.NotNil(t, node)
47+
48+
// the logical plan node before and after marshal/unmarshal should be the same
49+
verifyNodeStructure(t, (*plan).Root(), node)
50+
})
51+
}
52+
})
53+
}
54+
55+
func verifyNodeStructure(t *testing.T, expected logicalplan.Node, actual logicalplan.Node) {
56+
require.Equal(t, expected.Type(), actual.Type())
57+
require.Equal(t, expected.String(), actual.String())
58+
require.Equal(t, expected.ReturnType(), actual.ReturnType())
59+
60+
expectedChildren := expected.Children()
61+
actualChildren := actual.Children()
62+
63+
require.Equal(t, len(expectedChildren), len(actualChildren))
64+
65+
for i := 0; i < len(expectedChildren); i++ {
66+
if expectedChildren[i] != nil && actualChildren[i] != nil {
67+
verifyNodeStructure(t, *expectedChildren[i], *actualChildren[i])
68+
}
69+
}
70+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package distributed_execution
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/prometheus/prometheus/util/annotations"
7+
"github.com/thanos-io/promql-engine/logicalplan"
8+
)
9+
10+
// This is a simplified implementation that only handles binary aggregation cases
11+
// Future versions of the distributed optimizer are expected to:
12+
// - Support more complex query patterns
13+
// - Incorporate diverse optimization strategies
14+
// - Extend support to node types beyond binary operations
15+
16+
type DistributedOptimizer struct{}
17+
18+
func (d *DistributedOptimizer) Optimize(root logicalplan.Node) (logicalplan.Node, annotations.Annotations, error) {
19+
warns := annotations.New()
20+
21+
if root == nil {
22+
return nil, *warns, fmt.Errorf("nil root node")
23+
}
24+
25+
logicalplan.TraverseBottomUp(nil, &root, func(parent, current *logicalplan.Node) bool {
26+
27+
if (*current).Type() == logicalplan.BinaryNode && d.hasAggregation(current) {
28+
ch := (*current).Children()
29+
30+
for _, child := range ch {
31+
temp := (*child).Clone()
32+
*child = NewRemoteNode(temp)
33+
*(*child).Children()[0] = temp
34+
}
35+
}
36+
37+
return false
38+
})
39+
40+
return root, *warns, nil
41+
}
42+
43+
func (d *DistributedOptimizer) hasAggregation(root *logicalplan.Node) bool {
44+
isAggr := false
45+
logicalplan.TraverseBottomUp(nil, root, func(parent, current *logicalplan.Node) bool {
46+
if (*current).Type() == logicalplan.AggregationNode {
47+
isAggr = true
48+
return true
49+
}
50+
return false
51+
})
52+
return isAggr
53+
}

0 commit comments

Comments
 (0)