Skip to content

Commit 440f1cc

Browse files
authored
fix(semaphore): ensure holderKeys carry all information needed. Fixes #8684 (#13553)
Signed-off-by: isubasinghe <isitha@pipekit.io>
1 parent 0604fda commit 440f1cc

File tree

5 files changed

+499
-60
lines changed

5 files changed

+499
-60
lines changed

pkg/apis/workflow/v1alpha1/workflow_types.go

+20-20
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,13 @@ const (
133133
VolumeClaimGCOnSuccess VolumeClaimGCStrategy = "OnWorkflowSuccess"
134134
)
135135

136+
type HoldingNameVersion int
137+
138+
const (
139+
HoldingNameV1 HoldingNameVersion = 1
140+
HoldingNameV2 HoldingNameVersion = 2
141+
)
142+
136143
// Workflow is the definition of a workflow resource
137144
// +genclient
138145
// +genclient:noStatus
@@ -3782,11 +3789,7 @@ func (ss *SemaphoreStatus) LockWaiting(holderKey, lockKey string, currentHolders
37823789

37833790
func (ss *SemaphoreStatus) LockAcquired(holderKey, lockKey string, currentHolders []string) bool {
37843791
i, semaphoreHolding := ss.GetHolding(lockKey)
3785-
items := strings.Split(holderKey, "/")
3786-
if len(items) == 0 {
3787-
return false
3788-
}
3789-
holdingName := items[len(items)-1]
3792+
holdingName := holderKey
37903793
if i < 0 {
37913794
ss.Holding = append(ss.Holding, SemaphoreHolding{Semaphore: lockKey, Holders: []string{holdingName}})
37923795
return true
@@ -3800,11 +3803,8 @@ func (ss *SemaphoreStatus) LockAcquired(holderKey, lockKey string, currentHolder
38003803

38013804
func (ss *SemaphoreStatus) LockReleased(holderKey, lockKey string) bool {
38023805
i, semaphoreHolding := ss.GetHolding(lockKey)
3803-
items := strings.Split(holderKey, "/")
3804-
if len(items) == 0 {
3805-
return false
3806-
}
3807-
holdingName := items[len(items)-1]
3806+
holdingName := holderKey
3807+
38083808
if i >= 0 {
38093809
semaphoreHolding.Holders = slice.RemoveString(semaphoreHolding.Holders, holdingName)
38103810
ss.Holding[i] = semaphoreHolding
@@ -3875,13 +3875,17 @@ func (ms *MutexStatus) LockWaiting(holderKey, lockKey string, currentHolders []s
38753875
return false
38763876
}
38773877

3878-
func (ms *MutexStatus) LockAcquired(holderKey, lockKey string, currentHolders []string) bool {
3879-
i, mutexHolding := ms.GetHolding(lockKey)
3878+
func CheckHolderKeyVersion(holderKey string) HoldingNameVersion {
38803879
items := strings.Split(holderKey, "/")
3881-
if len(items) == 0 {
3882-
return false
3880+
if len(items) == 2 || len(items) == 3 {
3881+
return HoldingNameV2
38833882
}
3884-
holdingName := items[len(items)-1]
3883+
return HoldingNameV1
3884+
}
3885+
3886+
func (ms *MutexStatus) LockAcquired(holderKey, lockKey string, currentHolders []string) bool {
3887+
i, mutexHolding := ms.GetHolding(lockKey)
3888+
holdingName := holderKey
38853889
if i < 0 {
38863890
ms.Holding = append(ms.Holding, MutexHolding{Mutex: lockKey, Holder: holdingName})
38873891
return true
@@ -3895,11 +3899,7 @@ func (ms *MutexStatus) LockAcquired(holderKey, lockKey string, currentHolders []
38953899

38963900
func (ms *MutexStatus) LockReleased(holderKey, lockKey string) bool {
38973901
i, holder := ms.GetHolding(lockKey)
3898-
items := strings.Split(holderKey, "/")
3899-
if len(items) == 0 {
3900-
return false
3901-
}
3902-
holdingName := items[len(items)-1]
3902+
holdingName := holderKey
39033903
if i >= 0 && holder.Holder == holdingName {
39043904
ms.Holding = append(ms.Holding[:i], ms.Holding[i+1:]...)
39053905
return true

workflow/controller/operator_concurrency_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -1092,17 +1092,17 @@ spec:
10921092
configMap:
10931093
name: cache-example-steps-simple
10941094
`)
1095+
wf.Name = "example-steps-simple-gas12"
10951096
cancel, controller := newController(wf)
10961097
defer cancel()
10971098

10981099
ctx := context.Background()
1099-
11001100
woc := newWorkflowOperationCtx(wf, controller)
11011101
woc.operate(ctx)
11021102

11031103
holdingJobs := make(map[string]string)
11041104
for _, node := range woc.wf.Status.Nodes {
1105-
holdingJobs[node.ID] = node.DisplayName
1105+
holdingJobs[fmt.Sprintf("%s/%s/%s", wf.Namespace, wf.Name, node.ID)] = node.DisplayName
11061106
}
11071107

11081108
// Check initial status: job-1 acquired the lock

workflow/sync/mutex_test.go

+13-8
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ status:
107107
mutex:
108108
holding:
109109
- holder: synchronization-wf-level-xxs94
110-
mutex: default/mutex/test
110+
mutex: default/Mutex/test
111111
`
112112

113113
func TestMutexLock(t *testing.T) {
@@ -142,7 +142,7 @@ func TestMutexLock(t *testing.T) {
142142
assert.NotNil(t, wf.Status.Synchronization)
143143
assert.NotNil(t, wf.Status.Synchronization.Mutex)
144144
assert.NotNil(t, wf.Status.Synchronization.Mutex.Holding)
145-
assert.Equal(t, wf.Name, wf.Status.Synchronization.Mutex.Holding[0].Holder)
145+
assert.Equal(t, getHolderKey(wf, ""), wf.Status.Synchronization.Mutex.Holding[0].Holder)
146146

147147
// Try to acquire again
148148
status, wfUpdate, msg, err = concurrenyMgr.TryAcquire(wf, "", wf.Spec.Synchronization)
@@ -194,7 +194,7 @@ func TestMutexLock(t *testing.T) {
194194
assert.True(t, wfUpdate)
195195
assert.NotNil(t, wf2.Status.Synchronization)
196196
assert.NotNil(t, wf2.Status.Synchronization.Mutex)
197-
assert.Equal(t, wf2.Name, wf2.Status.Synchronization.Mutex.Holding[0].Holder)
197+
assert.Equal(t, getHolderKey(wf2, ""), wf2.Status.Synchronization.Mutex.Holding[0].Holder)
198198
concurrenyMgr.ReleaseAll(wf2)
199199
assert.Nil(t, wf2.Status.Synchronization)
200200
})
@@ -216,7 +216,8 @@ func TestMutexLock(t *testing.T) {
216216
assert.NotNil(t, wf.Status.Synchronization)
217217
assert.NotNil(t, wf.Status.Synchronization.Mutex)
218218
assert.NotNil(t, wf.Status.Synchronization.Mutex.Holding)
219-
assert.Equal(t, wf.Name, wf.Status.Synchronization.Mutex.Holding[0].Holder)
219+
expected := getHolderKey(wf, "")
220+
assert.Equal(t, expected, wf.Status.Synchronization.Mutex.Holding[0].Holder)
220221

221222
// Try to acquire again
222223
status, wfUpdate, msg, err = concurrenyMgr.TryAcquire(wf, "", wf.Spec.Synchronization)
@@ -271,7 +272,8 @@ func TestMutexLock(t *testing.T) {
271272
assert.True(t, wfUpdate)
272273
assert.NotNil(t, wf2.Status.Synchronization)
273274
assert.NotNil(t, wf2.Status.Synchronization.Mutex)
274-
assert.Equal(t, wf2.Name, wf2.Status.Synchronization.Mutex.Holding[0].Holder)
275+
expected = getHolderKey(wf2, "")
276+
assert.Equal(t, expected, wf2.Status.Synchronization.Mutex.Holding[0].Holder)
275277
concurrenyMgr.ReleaseAll(wf2)
276278
assert.Nil(t, wf2.Status.Synchronization)
277279
})
@@ -395,7 +397,8 @@ func TestMutexTmplLevel(t *testing.T) {
395397
assert.True(t, wfUpdate)
396398
assert.NotNil(t, wf.Status.Synchronization)
397399
assert.NotNil(t, wf.Status.Synchronization.Mutex)
398-
assert.Equal(t, "synchronization-tmpl-level-mutex-vjcdk-3941195474", wf.Status.Synchronization.Mutex.Holding[0].Holder)
400+
expected := getHolderKey(wf, "synchronization-tmpl-level-mutex-vjcdk-3941195474")
401+
assert.Equal(t, expected, wf.Status.Synchronization.Mutex.Holding[0].Holder)
399402

400403
// Try to acquire again
401404
status, wfUpdate, msg, err = concurrenyMgr.TryAcquire(wf, "synchronization-tmpl-level-mutex-vjcdk-2216915482", tmpl.Synchronization)
@@ -410,7 +413,8 @@ func TestMutexTmplLevel(t *testing.T) {
410413
assert.False(t, wfUpdate)
411414
assert.False(t, status)
412415

413-
assert.Equal(t, "synchronization-tmpl-level-mutex-vjcdk-3941195474", wf.Status.Synchronization.Mutex.Holding[0].Holder)
416+
expected = getHolderKey(wf, "synchronization-tmpl-level-mutex-vjcdk-3941195474")
417+
assert.Equal(t, expected, wf.Status.Synchronization.Mutex.Holding[0].Holder)
414418
concurrenyMgr.Release(wf, "synchronization-tmpl-level-mutex-vjcdk-3941195474", tmpl.Synchronization)
415419
assert.NotNil(t, wf.Status.Synchronization)
416420
assert.NotNil(t, wf.Status.Synchronization.Mutex)
@@ -423,7 +427,8 @@ func TestMutexTmplLevel(t *testing.T) {
423427
assert.True(t, wfUpdate)
424428
assert.NotNil(t, wf.Status.Synchronization)
425429
assert.NotNil(t, wf.Status.Synchronization.Mutex)
426-
assert.Equal(t, "synchronization-tmpl-level-mutex-vjcdk-2216915482", wf.Status.Synchronization.Mutex.Holding[0].Holder)
430+
expected = getHolderKey(wf, "synchronization-tmpl-level-mutex-vjcdk-2216915482")
431+
assert.Equal(t, expected, wf.Status.Synchronization.Mutex.Holding[0].Holder)
427432

428433
assert.NotEqual(t, "synchronization-tmpl-level-mutex-vjcdk-3941195474", wf.Status.Synchronization.Mutex.Holding[0].Holder)
429434
concurrenyMgr.Release(wf, "synchronization-tmpl-level-mutex-vjcdk-3941195474", tmpl.Synchronization)

workflow/sync/sync_manager.go

+95-23
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,77 @@ func (cm *Manager) CheckWorkflowExistence() {
6666
}
6767
}
6868

69+
func getUpgradedKey(wf *wfv1.Workflow, key string, level SyncLevelType) string {
70+
if wfv1.CheckHolderKeyVersion(key) == wfv1.HoldingNameV1 {
71+
if level == WorkflowLevel {
72+
return getHolderKey(wf, "")
73+
}
74+
return getHolderKey(wf, key)
75+
}
76+
return key
77+
}
78+
79+
type SyncLevelType int
80+
81+
const (
82+
WorkflowLevel SyncLevelType = 1
83+
TemplateLevel SyncLevelType = 2
84+
ErrorLevel SyncLevelType = 3
85+
)
86+
87+
// HoldingNameV1 keys can be of the form
88+
// x where x is a workflow name
89+
// unfortunately this doesn't differentiate between workflow level keys
90+
// and template level keys. So upgrading is a bit tricky here.
91+
92+
// given a legacy holding name x, namespace y and workflow name z.
93+
// in the case of a workflow level
94+
// if x != z
95+
// upgradedKey := y/z
96+
// elseif x == z
97+
// upgradedKey := y/z
98+
// in the case of a template level
99+
// if x != z
100+
// upgradedKey := y/z/x
101+
// elif x == z
102+
// upgradedKey := y/z/x
103+
104+
// there is a possibility that
105+
// a synchronization exists both at the template level
106+
// and at the workflow level -> impossible to upgrade correctly
107+
// due to ambiguity. Currently we just assume workflow level.
108+
func getWorkflowSyncLevelByName(wf *wfv1.Workflow, lockName string) (SyncLevelType, error) {
109+
if wf.Spec.Synchronization != nil {
110+
syncLockName, err := GetLockName(wf.Spec.Synchronization, wf.Namespace)
111+
if err != nil {
112+
return ErrorLevel, err
113+
}
114+
checkName := syncLockName.EncodeName()
115+
if lockName == checkName {
116+
return WorkflowLevel, nil
117+
}
118+
}
119+
120+
var lastErr error
121+
for _, template := range wf.Spec.Templates {
122+
if template.Synchronization != nil {
123+
syncLockName, err := GetLockName(template.Synchronization, wf.Namespace)
124+
if err != nil {
125+
lastErr = err
126+
continue
127+
}
128+
checkName := syncLockName.EncodeName()
129+
if lockName == checkName {
130+
return TemplateLevel, nil
131+
}
132+
}
133+
}
134+
if lastErr == nil {
135+
lastErr = fmt.Errorf("was unable to determine level for %s", lockName)
136+
}
137+
return ErrorLevel, lastErr
138+
}
139+
69140
func (cm *Manager) Initialize(wfs []wfv1.Workflow) {
70141
for _, wf := range wfs {
71142
if wf.Status.Synchronization == nil {
@@ -86,11 +157,17 @@ func (cm *Manager) Initialize(wfs []wfv1.Workflow) {
86157
}
87158

88159
for _, holders := range holding.Holders {
89-
resourceKey := getResourceKey(wf.Namespace, wf.Name, holders)
90-
if semaphore != nil && semaphore.acquire(resourceKey) {
91-
log.Infof("Lock acquired by %s from %s", resourceKey, holding.Semaphore)
160+
level, err := getWorkflowSyncLevelByName(&wf, holding.Semaphore)
161+
if err != nil {
162+
log.Warnf("cannot obtain lock level for '%s' : %v", holding.Semaphore, err)
163+
continue
164+
}
165+
key := getUpgradedKey(&wf, holders, level)
166+
if semaphore != nil && semaphore.acquire(key) {
167+
log.Infof("Lock acquired by %s from %s", key, holding.Semaphore)
92168
}
93169
}
170+
94171
}
95172
}
96173

@@ -101,8 +178,13 @@ func (cm *Manager) Initialize(wfs []wfv1.Workflow) {
101178
if mutex == nil {
102179
mutex := cm.initializeMutex(holding.Mutex)
103180
if holding.Holder != "" {
104-
resourceKey := getResourceKey(wf.Namespace, wf.Name, holding.Holder)
105-
mutex.acquire(resourceKey)
181+
level, err := getWorkflowSyncLevelByName(&wf, holding.Mutex)
182+
if err != nil {
183+
log.Warnf("cannot obtain lock level for '%s' : %v", holding.Mutex, err)
184+
continue
185+
}
186+
key := getUpgradedKey(&wf, holding.Holder, level)
187+
mutex.acquire(key)
106188
}
107189
cm.syncLockMap[holding.Mutex] = mutex
108190
}
@@ -214,10 +296,9 @@ func (cm *Manager) ReleaseAll(wf *wfv1.Workflow) bool {
214296
}
215297

216298
for _, holderKey := range holding.Holders {
217-
resourceKey := getResourceKey(wf.Namespace, wf.Name, holderKey)
218-
syncLockHolder.release(resourceKey)
299+
syncLockHolder.release(holderKey)
219300
wf.Status.Synchronization.Semaphore.LockReleased(holderKey, holding.Semaphore)
220-
log.Infof("%s released a lock from %s", resourceKey, holding.Semaphore)
301+
log.Infof("%s released a lock from %s", holderKey, holding.Semaphore)
221302
}
222303
}
223304

@@ -227,8 +308,8 @@ func (cm *Manager) ReleaseAll(wf *wfv1.Workflow) bool {
227308
if syncLockHolder == nil {
228309
continue
229310
}
230-
resourceKey := getResourceKey(wf.Namespace, wf.Name, wf.Name)
231-
syncLockHolder.removeFromQueue(resourceKey)
311+
key := getHolderKey(wf, "")
312+
syncLockHolder.removeFromQueue(key)
232313
}
233314
wf.Status.Synchronization.Semaphore = nil
234315
}
@@ -240,10 +321,9 @@ func (cm *Manager) ReleaseAll(wf *wfv1.Workflow) bool {
240321
continue
241322
}
242323

243-
resourceKey := getResourceKey(wf.Namespace, wf.Name, holding.Holder)
244-
syncLockHolder.release(resourceKey)
324+
syncLockHolder.release(holding.Holder)
245325
wf.Status.Synchronization.Mutex.LockReleased(holding.Holder, holding.Mutex)
246-
log.Infof("%s released a lock from %s", resourceKey, holding.Mutex)
326+
log.Infof("%s released a lock from %s", holding.Holder, holding.Mutex)
247327
}
248328

249329
// Remove the pending Workflow level mutex keys
@@ -252,8 +332,8 @@ func (cm *Manager) ReleaseAll(wf *wfv1.Workflow) bool {
252332
if syncLockHolder == nil {
253333
continue
254334
}
255-
resourceKey := getResourceKey(wf.Namespace, wf.Name, wf.Name)
256-
syncLockHolder.removeFromQueue(resourceKey)
335+
key := getHolderKey(wf, "")
336+
syncLockHolder.removeFromQueue(key)
257337
}
258338
wf.Status.Synchronization.Mutex = nil
259339
}
@@ -296,14 +376,6 @@ func getHolderKey(wf *wfv1.Workflow, nodeName string) string {
296376
return key
297377
}
298378

299-
func getResourceKey(namespace, wfName, resourceName string) string {
300-
resourceKey := fmt.Sprintf("%s/%s", namespace, wfName)
301-
if resourceName != wfName {
302-
resourceKey = fmt.Sprintf("%s/%s", resourceKey, resourceName)
303-
}
304-
return resourceKey
305-
}
306-
307379
func (cm *Manager) getCurrentLockHolders(lockName string) []string {
308380
if concurrency, ok := cm.syncLockMap[lockName]; ok {
309381
return concurrency.getCurrentHolders()

0 commit comments

Comments
 (0)