Skip to content

Commit 41b435c

Browse files
committed
feat: add storage class allowedTopologies
Check the nodes for storage class topology keys subsets. Signed-off-by: Serge Logvinov <serge.logvinov@sinextra.dev>
1 parent d209f05 commit 41b435c

File tree

5 files changed

+235
-5
lines changed

5 files changed

+235
-5
lines changed

.golangci.yml

+2-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ output:
2828
path: stdout
2929
print-issued-lines: true
3030
print-linter-name: true
31-
uniq-by-line: true
3231
sort-results: true
3332

3433
# all available settings of specific linters
@@ -175,6 +174,8 @@ issues:
175174
exclude-dirs:
176175
# copied from kubernetes repo
177176
- pkg/csi
177+
exclude-files:
178+
- pkg/provisioner/topology.go
178179

179180
# Maximum issues count per one linter. Set to 0 to disable. Default is 50.
180181
max-issues-per-linter: 0

docs/deploy/csidriver.yaml

+22-2
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,34 @@ spec:
1010
volumeLifecycleModes:
1111
- Persistent
1212
---
13-
# Source: hybrid-csi-plugin/templates/storageclass.yaml
1413
apiVersion: storage.k8s.io/v1
1514
kind: StorageClass
1615
metadata:
1716
name: hybrid
1817
provisioner: csi.hybrid.sinextra.dev
1918
parameters:
20-
storageClasses: proxmox,hcloud-volumes,local-path
19+
storageClasses: proxmox-test,local-path
2120
allowVolumeExpansion: true
2221
volumeBindingMode: WaitForFirstConsumer
2322
reclaimPolicy: Delete
23+
---
24+
apiVersion: storage.k8s.io/v1
25+
kind: StorageClass
26+
metadata:
27+
name: proxmox-test
28+
annotations:
29+
resize.topolvm.io/enabled: "true"
30+
provisioner: csi.proxmox.sinextra.dev
31+
parameters:
32+
storage: lvm
33+
allowVolumeExpansion: true
34+
reclaimPolicy: Delete
35+
volumeBindingMode: WaitForFirstConsumer
36+
allowedTopologies:
37+
- matchLabelExpressions:
38+
- key: topology.kubernetes.io/region
39+
values:
40+
- fsn1
41+
- key: topology.kubernetes.io/zone
42+
values:
43+
- hvm-1

docs/deploy/test-pod-ephemeral.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ spec:
77
tolerations:
88
- effect: NoSchedule
99
key: node-role.kubernetes.io/control-plane
10-
# nodeSelector:
11-
# kubernetes.io/hostname: controlplane-fsn1b
10+
nodeSelector:
11+
kubernetes.io/hostname: kube-02a
1212
containers:
1313
- name: alpine
1414
image: alpine

pkg/provisioner/provisioner.go

+27
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,33 @@ func (p *HybridProvisioner) getStorageClassFromNode(selectedNode *corev1.Node, s
162162
continue
163163
}
164164

165+
if len(class.AllowedTopologies) > 0 {
166+
topologyKeys := getTopologyKeys(selectedCSINode, class.Provisioner)
167+
168+
selectedTopology, isMissingKey := getTopologyFromNode(selectedNode, topologyKeys)
169+
if isMissingKey {
170+
klog.V(5).InfoS("getTopologyFromNode key is missing", "node", klog.KObj(selectedNode), "storageClass", storageClass)
171+
172+
continue
173+
}
174+
175+
allowedTopologiesFlatten := flatten(class.AllowedTopologies)
176+
177+
found := false
178+
for _, t := range allowedTopologiesFlatten {
179+
if t.subset(selectedTopology) {
180+
found = true
181+
break
182+
}
183+
}
184+
185+
if !found {
186+
klog.V(4).InfoS("topology is not in allowed", "node", klog.KObj(selectedNode), "storageClass", storageClass, "topology", selectedTopology)
187+
188+
continue
189+
}
190+
}
191+
165192
if driver, err := p.driverLister.Get(class.Provisioner); err != nil || driver == nil {
166193
// Provisioner is not a CSI driver
167194
return class, nil // nolint: nilerr

pkg/provisioner/topology.go

+182
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package provisioner // copy from external-provisioner/pkg/controller/topology.go
18+
19+
import (
20+
"fmt"
21+
"slices"
22+
"strings"
23+
24+
"github.com/container-storage-interface/spec/lib/go/csi"
25+
26+
v1 "k8s.io/api/core/v1"
27+
storagev1 "k8s.io/api/storage/v1"
28+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
"k8s.io/apimachinery/pkg/labels"
30+
)
31+
32+
type topologySegment struct {
33+
Key, Value string
34+
}
35+
36+
// topologyTerm represents a single term where its topology key value pairs are AND'd together.
37+
//
38+
// Be sure to sort after construction for compare() and subset() to work properly.
39+
type topologyTerm []topologySegment
40+
41+
func flatten(allowedTopologies []v1.TopologySelectorTerm) []topologyTerm {
42+
var finalTerms []topologyTerm
43+
for _, selectorTerm := range allowedTopologies { // OR
44+
45+
var oldTerms []topologyTerm
46+
for _, selectorExpression := range selectorTerm.MatchLabelExpressions { // AND
47+
48+
var newTerms []topologyTerm
49+
for _, v := range selectorExpression.Values { // OR
50+
// Distribute the OR over AND.
51+
52+
if len(oldTerms) == 0 {
53+
// No previous terms to distribute over. Simply append the new term.
54+
newTerms = append(newTerms, topologyTerm{{selectorExpression.Key, v}})
55+
} else {
56+
for _, oldTerm := range oldTerms {
57+
// "Distribute" by adding an entry to the term
58+
newTerm := slices.Clone(oldTerm)
59+
newTerm = append(newTerm, topologySegment{selectorExpression.Key, v})
60+
newTerms = append(newTerms, newTerm)
61+
}
62+
}
63+
}
64+
65+
oldTerms = newTerms
66+
}
67+
68+
// Concatenate all OR'd terms.
69+
finalTerms = append(finalTerms, oldTerms...)
70+
}
71+
72+
for _, term := range finalTerms {
73+
term.sort()
74+
}
75+
return finalTerms
76+
}
77+
78+
func getTopologyKeys(csiNode *storagev1.CSINode, driverName string) []string {
79+
for _, driver := range csiNode.Spec.Drivers {
80+
if driver.Name == driverName {
81+
return driver.TopologyKeys
82+
}
83+
}
84+
return nil
85+
}
86+
87+
func getTopologyFromNode(node *v1.Node, topologyKeys []string) (term topologyTerm, isMissingKey bool) {
88+
term = make(topologyTerm, 0, len(topologyKeys))
89+
for _, key := range topologyKeys {
90+
v, ok := node.Labels[key]
91+
if !ok {
92+
return nil, true
93+
}
94+
term = append(term, topologySegment{key, v})
95+
}
96+
term.sort()
97+
return term, false
98+
}
99+
100+
func buildTopologyKeySelector(topologyKeys []string) (labels.Selector, error) {
101+
var expr []metav1.LabelSelectorRequirement
102+
for _, key := range topologyKeys {
103+
expr = append(expr, metav1.LabelSelectorRequirement{
104+
Key: key,
105+
Operator: metav1.LabelSelectorOpExists,
106+
})
107+
}
108+
109+
labelSelector := metav1.LabelSelector{
110+
MatchExpressions: expr,
111+
}
112+
113+
selector, err := metav1.LabelSelectorAsSelector(&labelSelector)
114+
if err != nil {
115+
return nil, fmt.Errorf("error parsing topology keys selector: %v", err)
116+
}
117+
118+
return selector, nil
119+
}
120+
121+
func (t topologyTerm) sort() {
122+
slices.SortFunc(t, func(a, b topologySegment) int {
123+
r := strings.Compare(a.Key, b.Key)
124+
if r != 0 {
125+
return r
126+
}
127+
// Should not happen currently. We may support multi-value in the future?
128+
return strings.Compare(a.Value, b.Value)
129+
})
130+
}
131+
132+
func (t topologyTerm) compare(other topologyTerm) int {
133+
if len(t) != len(other) {
134+
return len(t) - len(other)
135+
}
136+
for i, k1 := range t {
137+
k2 := other[i]
138+
r := strings.Compare(k1.Key, k2.Key)
139+
if r != 0 {
140+
return r
141+
}
142+
r = strings.Compare(k1.Value, k2.Value)
143+
if r != 0 {
144+
return r
145+
}
146+
}
147+
return 0
148+
}
149+
150+
func (t topologyTerm) subset(other topologyTerm) bool {
151+
if len(t) == 0 {
152+
return true
153+
}
154+
j := 0
155+
for _, k2 := range other {
156+
k1 := t[j]
157+
if k1.Key != k2.Key {
158+
continue
159+
}
160+
if k1.Value != k2.Value {
161+
return false
162+
}
163+
j++
164+
if j == len(t) {
165+
// All segments in t have been checked and is present in other.
166+
return true
167+
}
168+
}
169+
return false
170+
}
171+
172+
func toCSITopology(terms []topologyTerm) []*csi.Topology {
173+
out := make([]*csi.Topology, 0, len(terms))
174+
for _, term := range terms {
175+
segs := make(map[string]string, len(term))
176+
for _, k := range term {
177+
segs[k.Key] = k.Value
178+
}
179+
out = append(out, &csi.Topology{Segments: segs})
180+
}
181+
return out
182+
}

0 commit comments

Comments
 (0)