From 94bce307d70651b867bc4695640329aa0fb65a89 Mon Sep 17 00:00:00 2001 From: whitewindmills Date: Fri, 24 May 2024 17:17:50 +0800 Subject: [PATCH] Integrate UpdateStatus function Signed-off-by: whitewindmills --- .../cronfederatedhpa/cronfederatedhpa_job.go | 157 +++++++++--------- .../execution/execution_controller.go | 15 +- ...erated_resource_quota_status_controller.go | 17 +- .../mcs/service_import_controller.go | 26 +-- .../endpointslice_dispatch_controller.go | 15 +- .../multiclusterservice/mcs_controller.go | 15 +- .../remediation/remedy_controller.go | 28 +--- .../status/cluster_status_controller.go | 16 +- .../status/work_status_controller.go | 40 +---- pkg/util/helper/status.go | 69 ++++++++ pkg/util/helper/workstatus.go | 92 ++++------ 11 files changed, 223 insertions(+), 267 deletions(-) create mode 100755 pkg/util/helper/status.go diff --git a/pkg/controllers/cronfederatedhpa/cronfederatedhpa_job.go b/pkg/controllers/cronfederatedhpa/cronfederatedhpa_job.go index c50b4541171f..1a250a4d9c31 100755 --- a/pkg/controllers/cronfederatedhpa/cronfederatedhpa_job.go +++ b/pkg/controllers/cronfederatedhpa/cronfederatedhpa_job.go @@ -32,6 +32,7 @@ import ( "k8s.io/client-go/util/retry" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" autoscalingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/autoscaling/v1alpha1" "github.com/karmada-io/karmada/pkg/metrics" @@ -226,52 +227,54 @@ func (c *ScalingJob) addFailedExecutionHistory( cronFHPA *autoscalingv1alpha1.CronFederatedHPA, errMsg string) error { _, nextExecutionTime := c.scheduler.NextRun() - // Add success history record, return false if there is no such rule - addFailedHistoryFunc := func() bool { - exists := false - for index, rule := range cronFHPA.Status.ExecutionHistories { - if rule.RuleName != c.rule.Name { - continue - } - failedExecution := autoscalingv1alpha1.FailedExecution{ - ScheduleTime: rule.NextExecutionTime, - ExecutionTime: &metav1.Time{Time: time.Now()}, - Message: errMsg, - } - historyLimits := helper.GetCronFederatedHPAFailedHistoryLimits(c.rule) - if len(rule.FailedExecutions) > historyLimits-1 { - rule.FailedExecutions = rule.FailedExecutions[:historyLimits-1] - } - cronFHPA.Status.ExecutionHistories[index].FailedExecutions = - append([]autoscalingv1alpha1.FailedExecution{failedExecution}, rule.FailedExecutions...) - cronFHPA.Status.ExecutionHistories[index].NextExecutionTime = &metav1.Time{Time: nextExecutionTime} - exists = true - break + // Add failed history record + addFailedHistoryFunc := func(index int) { + failedExecution := autoscalingv1alpha1.FailedExecution{ + ScheduleTime: cronFHPA.Status.ExecutionHistories[index].NextExecutionTime, + ExecutionTime: &metav1.Time{Time: time.Now()}, + Message: errMsg, } - - return exists + historyLimits := helper.GetCronFederatedHPAFailedHistoryLimits(c.rule) + if len(cronFHPA.Status.ExecutionHistories[index].FailedExecutions) > historyLimits-1 { + cronFHPA.Status.ExecutionHistories[index].FailedExecutions = cronFHPA.Status.ExecutionHistories[index].FailedExecutions[:historyLimits-1] + } + cronFHPA.Status.ExecutionHistories[index].FailedExecutions = + append([]autoscalingv1alpha1.FailedExecution{failedExecution}, cronFHPA.Status.ExecutionHistories[index].FailedExecutions...) + cronFHPA.Status.ExecutionHistories[index].NextExecutionTime = &metav1.Time{Time: nextExecutionTime} } - return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) { - // If this history not exist, it means the rule is suspended or deleted, so just ignore it. - if exists := addFailedHistoryFunc(); !exists { + var operationResult controllerutil.OperationResult + if err := retry.RetryOnConflict(retry.DefaultRetry, func() (err error) { + operationResult, err = helper.UpdateStatus(context.Background(), c.client, cronFHPA, func() error { + index := c.findExecutionHistory(cronFHPA.Status.ExecutionHistories) + if index < 0 { + // The failed history does not exist, it means the rule deleted, so just ignore it. + return nil + } + addFailedHistoryFunc(index) return nil - } + }) + return err + }); err != nil { + klog.Errorf("Failed to add failed history record to CronFederatedHPA(%s/%s): %v", cronFHPA.Namespace, cronFHPA.Name, err) + return err + } - updateErr := c.client.Status().Update(context.Background(), cronFHPA) - if updateErr == nil { - klog.V(4).Infof("CronFederatedHPA(%s/%s) status has been updated successfully", cronFHPA.Namespace, cronFHPA.Name) - return nil - } + if operationResult == controllerutil.OperationResultUpdatedStatusOnly { + klog.V(4).Infof("CronFederatedHPA(%s/%s) status has been updated successfully", cronFHPA.Namespace, cronFHPA.Name) + } - updated := &autoscalingv1alpha1.CronFederatedHPA{} - if err = c.client.Get(context.Background(), client.ObjectKey{Namespace: cronFHPA.Namespace, Name: cronFHPA.Name}, updated); err == nil { - cronFHPA = updated - } else { - klog.Errorf("Get CronFederatedHPA(%s/%s) failed: %v", cronFHPA.Namespace, cronFHPA.Name, err) + return nil +} + +// findExecutionHistory finds the history record, returns -1 if there is no such rule. +func (c *ScalingJob) findExecutionHistory(histories []autoscalingv1alpha1.ExecutionHistory) int { + for index, rule := range histories { + if rule.RuleName == c.rule.Name { + return index } - return updateErr - }) + } + return -1 } func (c *ScalingJob) addSuccessExecutionHistory( @@ -279,52 +282,44 @@ func (c *ScalingJob) addSuccessExecutionHistory( appliedReplicas, appliedMinReplicas, appliedMaxReplicas *int32) error { _, nextExecutionTime := c.scheduler.NextRun() - // Add success history record, return false if there is no such rule - addSuccessHistoryFunc := func() bool { - exists := false - for index, rule := range cronFHPA.Status.ExecutionHistories { - if rule.RuleName != c.rule.Name { - continue - } - successExecution := autoscalingv1alpha1.SuccessfulExecution{ - ScheduleTime: rule.NextExecutionTime, - ExecutionTime: &metav1.Time{Time: time.Now()}, - AppliedReplicas: appliedReplicas, - AppliedMaxReplicas: appliedMaxReplicas, - AppliedMinReplicas: appliedMinReplicas, - } - historyLimits := helper.GetCronFederatedHPASuccessHistoryLimits(c.rule) - if len(rule.SuccessfulExecutions) > historyLimits-1 { - rule.SuccessfulExecutions = rule.SuccessfulExecutions[:historyLimits-1] - } - cronFHPA.Status.ExecutionHistories[index].SuccessfulExecutions = - append([]autoscalingv1alpha1.SuccessfulExecution{successExecution}, rule.SuccessfulExecutions...) - cronFHPA.Status.ExecutionHistories[index].NextExecutionTime = &metav1.Time{Time: nextExecutionTime} - exists = true - break + // Add success history record + addSuccessHistoryFunc := func(index int) { + successExecution := autoscalingv1alpha1.SuccessfulExecution{ + ScheduleTime: cronFHPA.Status.ExecutionHistories[index].NextExecutionTime, + ExecutionTime: &metav1.Time{Time: time.Now()}, + AppliedReplicas: appliedReplicas, + AppliedMaxReplicas: appliedMaxReplicas, + AppliedMinReplicas: appliedMinReplicas, } - - return exists + historyLimits := helper.GetCronFederatedHPASuccessHistoryLimits(c.rule) + if len(cronFHPA.Status.ExecutionHistories[index].SuccessfulExecutions) > historyLimits-1 { + cronFHPA.Status.ExecutionHistories[index].SuccessfulExecutions = cronFHPA.Status.ExecutionHistories[index].SuccessfulExecutions[:historyLimits-1] + } + cronFHPA.Status.ExecutionHistories[index].SuccessfulExecutions = + append([]autoscalingv1alpha1.SuccessfulExecution{successExecution}, cronFHPA.Status.ExecutionHistories[index].SuccessfulExecutions...) + cronFHPA.Status.ExecutionHistories[index].NextExecutionTime = &metav1.Time{Time: nextExecutionTime} } - return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) { - // If this history not exist, it means the rule deleted, so just ignore it. - if exists := addSuccessHistoryFunc(); !exists { + var operationResult controllerutil.OperationResult + if err := retry.RetryOnConflict(retry.DefaultRetry, func() (err error) { + operationResult, err = helper.UpdateStatus(context.Background(), c.client, cronFHPA, func() error { + index := c.findExecutionHistory(cronFHPA.Status.ExecutionHistories) + if index < 0 { + // The success history does not exist, it means the rule deleted, so just ignore it. + return nil + } + addSuccessHistoryFunc(index) return nil - } + }) + return err + }); err != nil { + klog.Errorf("Failed to add success history record to CronFederatedHPA(%s/%s): %v", cronFHPA.Namespace, cronFHPA.Name, err) + return err + } - updateErr := c.client.Status().Update(context.Background(), cronFHPA) - if updateErr == nil { - klog.V(4).Infof("CronFederatedHPA(%s/%s) status has been updated successfully", cronFHPA.Namespace, cronFHPA.Name) - return err - } + if operationResult == controllerutil.OperationResultUpdatedStatusOnly { + klog.V(4).Infof("CronFederatedHPA(%s/%s) status has been updated successfully", cronFHPA.Namespace, cronFHPA.Name) + } - updated := &autoscalingv1alpha1.CronFederatedHPA{} - if err = c.client.Get(context.Background(), client.ObjectKey{Namespace: cronFHPA.Namespace, Name: cronFHPA.Name}, updated); err == nil { - cronFHPA = updated - } else { - klog.Errorf("Get CronFederatedHPA(%s/%s) failed: %v", cronFHPA.Namespace, cronFHPA.Name, err) - } - return updateErr - }) + return nil } diff --git a/pkg/controllers/execution/execution_controller.go b/pkg/controllers/execution/execution_controller.go index d484ff90d4d8..8a398576c2bd 100644 --- a/pkg/controllers/execution/execution_controller.go +++ b/pkg/controllers/execution/execution_controller.go @@ -258,18 +258,11 @@ func (c *Controller) updateAppliedCondition(work *workv1alpha1.Work, status meta } return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) { - meta.SetStatusCondition(&work.Status.Conditions, newWorkAppliedCondition) - updateErr := c.Status().Update(context.TODO(), work) - if updateErr == nil { + _, err = helper.UpdateStatus(context.Background(), c.Client, work, func() error { + meta.SetStatusCondition(&work.Status.Conditions, newWorkAppliedCondition) return nil - } - updated := &workv1alpha1.Work{} - if err = c.Get(context.TODO(), client.ObjectKey{Namespace: work.Namespace, Name: work.Name}, updated); err == nil { - work = updated - } else { - klog.Errorf("Failed to get the updated work(%s/%s), err: %v", work.Namespace, work.Name, err) - } - return updateErr + }) + return err }) } diff --git a/pkg/controllers/federatedresourcequota/federated_resource_quota_status_controller.go b/pkg/controllers/federatedresourcequota/federated_resource_quota_status_controller.go index b7d1533022c6..23039535f0bd 100644 --- a/pkg/controllers/federatedresourcequota/federated_resource_quota_status_controller.go +++ b/pkg/controllers/federatedresourcequota/federated_resource_quota_status_controller.go @@ -160,20 +160,11 @@ func (c *StatusController) collectQuotaStatus(quota *policyv1alpha1.FederatedRes } return retry.RetryOnConflict(retry.DefaultRetry, func() error { - quota.Status = *quotaStatus - updateErr := c.Status().Update(context.TODO(), quota) - if updateErr == nil { + _, err = helper.UpdateStatus(context.Background(), c.Client, quota, func() error { + quota.Status = *quotaStatus return nil - } - - updated := &policyv1alpha1.FederatedResourceQuota{} - if err = c.Get(context.TODO(), client.ObjectKey{Namespace: quota.Namespace, Name: quota.Name}, updated); err == nil { - quota = updated - } else { - klog.Errorf("Failed to get updated federatedResourceQuota(%s): %v", klog.KObj(quota).String(), err) - } - - return updateErr + }) + return err }) } diff --git a/pkg/controllers/mcs/service_import_controller.go b/pkg/controllers/mcs/service_import_controller.go index 329e5256e0c5..2acb71f62570 100644 --- a/pkg/controllers/mcs/service_import_controller.go +++ b/pkg/controllers/mcs/service_import_controller.go @@ -32,6 +32,7 @@ import ( "github.com/karmada-io/karmada/pkg/events" "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/names" ) @@ -153,24 +154,15 @@ func (c *ServiceImportController) updateServiceStatus(svcImport *mcsv1alpha1.Ser } err := retry.RetryOnConflict(retry.DefaultRetry, func() (err error) { - derivedService.Status = corev1.ServiceStatus{ - LoadBalancer: corev1.LoadBalancerStatus{ - Ingress: ingress, - }, - } - updateErr := c.Status().Update(context.TODO(), derivedService) - if updateErr == nil { + _, err = helper.UpdateStatus(context.Background(), c.Client, derivedService, func() error { + derivedService.Status = corev1.ServiceStatus{ + LoadBalancer: corev1.LoadBalancerStatus{ + Ingress: ingress, + }, + } return nil - } - - updated := &corev1.Service{} - if err = c.Get(context.TODO(), client.ObjectKey{Namespace: derivedService.Namespace, Name: derivedService.Name}, updated); err == nil { - derivedService = updated - } else { - klog.Errorf("Failed to get updated service %s/%s: %v", derivedService.Namespace, derivedService.Name, err) - } - - return updateErr + }) + return err }) if err != nil { diff --git a/pkg/controllers/multiclusterservice/endpointslice_dispatch_controller.go b/pkg/controllers/multiclusterservice/endpointslice_dispatch_controller.go index 8175a65ef43f..b5d7e4261c5c 100644 --- a/pkg/controllers/multiclusterservice/endpointslice_dispatch_controller.go +++ b/pkg/controllers/multiclusterservice/endpointslice_dispatch_controller.go @@ -127,18 +127,11 @@ func (c *EndpointsliceDispatchController) updateEndpointSliceDispatched(mcs *net } return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) { - meta.SetStatusCondition(&mcs.Status.Conditions, EndpointSliceCollected) - updateErr := c.Status().Update(context.TODO(), mcs) - if updateErr == nil { + _, err = helper.UpdateStatus(context.Background(), c.Client, mcs, func() error { + meta.SetStatusCondition(&mcs.Status.Conditions, EndpointSliceCollected) return nil - } - updated := &networkingv1alpha1.MultiClusterService{} - if err = c.Get(context.TODO(), client.ObjectKey{Namespace: mcs.Namespace, Name: mcs.Name}, updated); err == nil { - mcs = updated - } else { - klog.Errorf("Failed to get updated MultiClusterService %s/%s: %v", mcs.Namespace, mcs.Name, err) - } - return updateErr + }) + return err }) } diff --git a/pkg/controllers/multiclusterservice/mcs_controller.go b/pkg/controllers/multiclusterservice/mcs_controller.go index 3e353261b0db..4a30b9683b66 100644 --- a/pkg/controllers/multiclusterservice/mcs_controller.go +++ b/pkg/controllers/multiclusterservice/mcs_controller.go @@ -525,18 +525,11 @@ func (c *MCSController) updateMultiClusterServiceStatus(mcs *networkingv1alpha1. } return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) { - meta.SetStatusCondition(&mcs.Status.Conditions, serviceAppliedCondition) - updateErr := c.Status().Update(context.TODO(), mcs) - if updateErr == nil { + _, err = helper.UpdateStatus(context.Background(), c.Client, mcs, func() error { + meta.SetStatusCondition(&mcs.Status.Conditions, serviceAppliedCondition) return nil - } - updated := &networkingv1alpha1.MultiClusterService{} - if err = c.Get(context.TODO(), client.ObjectKey{Namespace: mcs.Namespace, Name: mcs.Name}, updated); err == nil { - mcs = updated - } else { - klog.Errorf("Failed to get updated MultiClusterService %s/%s: %v", mcs.Namespace, mcs.Name, err) - } - return updateErr + }) + return err }) } diff --git a/pkg/controllers/remediation/remedy_controller.go b/pkg/controllers/remediation/remedy_controller.go index c8424dea1a56..315c59d86e38 100644 --- a/pkg/controllers/remediation/remedy_controller.go +++ b/pkg/controllers/remediation/remedy_controller.go @@ -18,10 +18,8 @@ package remediation import ( "context" - "reflect" apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" @@ -33,6 +31,7 @@ import ( clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" remedyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/remedy/v1alpha1" "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" + "github.com/karmada-io/karmada/pkg/util/helper" ) // ControllerName is the controller name that will be used when reporting events. @@ -70,26 +69,13 @@ func (c *RemedyController) Reconcile(ctx context.Context, req controllerruntime. } actions := calculateActions(clusterRelatedRemedies, cluster) - err = retry.RetryOnConflict(retry.DefaultRetry, func() error { - if reflect.DeepEqual(actions, cluster.Status.RemedyActions) { + if err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + _, err = helper.UpdateStatus(context.Background(), c.Client, cluster, func() error { + cluster.Status.RemedyActions = actions return nil - } - cluster.Status.RemedyActions = actions - updateErr := c.Client.Status().Update(ctx, cluster) - if updateErr == nil { - return nil - } - - updatedCluster := &clusterv1alpha1.Cluster{} - err = c.Client.Get(ctx, types.NamespacedName{Name: cluster.Name}, updatedCluster) - if err == nil { - cluster = updatedCluster - } else { - klog.Errorf("Failed to get updated cluster(%s): %v", cluster.Name, err) - } - return updateErr - }) - if err != nil { + }) + return err + }); err != nil { klog.Errorf("Failed to sync cluster(%s) remedy actions: %v", cluster.Name, err) return controllerruntime.Result{}, err } diff --git a/pkg/controllers/status/cluster_status_controller.go b/pkg/controllers/status/cluster_status_controller.go index d94bff005344..776b8ae45cb6 100644 --- a/pkg/controllers/status/cluster_status_controller.go +++ b/pkg/controllers/status/cluster_status_controller.go @@ -277,19 +277,11 @@ func (c *ClusterStatusController) updateStatusIfNeeded(cluster *clusterv1alpha1. if !equality.Semantic.DeepEqual(cluster.Status, currentClusterStatus) { klog.V(4).Infof("Start to update cluster status: %s", cluster.Name) err := retry.RetryOnConflict(retry.DefaultRetry, func() (err error) { - cluster.Status = currentClusterStatus - updateErr := c.Status().Update(context.TODO(), cluster) - if updateErr == nil { + _, err = helper.UpdateStatus(context.Background(), c.Client, cluster, func() error { + cluster.Status = currentClusterStatus return nil - } - - updated := &clusterv1alpha1.Cluster{} - if err = c.Get(context.TODO(), client.ObjectKey{Namespace: cluster.Namespace, Name: cluster.Name}, updated); err == nil { - cluster = updated - } else { - klog.Errorf("Failed to get updated cluster %s: %v", cluster.Name, err) - } - return updateErr + }) + return err }) if err != nil { klog.Errorf("Failed to update health status of the member cluster: %v, err is : %v", cluster.Name, err) diff --git a/pkg/controllers/status/work_status_controller.go b/pkg/controllers/status/work_status_controller.go index 7f71c5873cd2..7178b00ab5e6 100644 --- a/pkg/controllers/status/work_status_controller.go +++ b/pkg/controllers/status/work_status_controller.go @@ -326,22 +326,11 @@ func (c *WorkStatusController) updateAppliedCondition(work *workv1alpha1.Work, s } err := retry.RetryOnConflict(retry.DefaultRetry, func() (err error) { - workStatus := work.Status.DeepCopy() - meta.SetStatusCondition(&work.Status.Conditions, newWorkAppliedCondition) - if reflect.DeepEqual(*workStatus, work.Status) { + _, err = helper.UpdateStatus(context.Background(), c.Client, work, func() error { + meta.SetStatusCondition(&work.Status.Conditions, newWorkAppliedCondition) return nil - } - updateErr := c.Status().Update(context.TODO(), work) - if updateErr == nil { - return nil - } - updated := &workv1alpha1.Work{} - if err = c.Get(context.TODO(), client.ObjectKey{Namespace: work.Namespace, Name: work.Name}, updated); err == nil { - work = updated - } else { - klog.Errorf("Failed to get updated work %s/%s: %s", work.Namespace, work.Name, err.Error()) - } - return updateErr + }) + return err }) if err != nil { @@ -386,25 +375,12 @@ func (c *WorkStatusController) reflectStatus(work *workv1alpha1.Work, clusterObj Health: resourceHealth, } - workCopy := work.DeepCopy() return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) { - manifestStatuses := c.mergeStatus(workCopy.Status.ManifestStatuses, manifestStatus) - if reflect.DeepEqual(workCopy.Status.ManifestStatuses, manifestStatuses) { + _, err = helper.UpdateStatus(context.Background(), c.Client, work, func() error { + work.Status.ManifestStatuses = c.mergeStatus(work.Status.ManifestStatuses, manifestStatus) return nil - } - workCopy.Status.ManifestStatuses = manifestStatuses - updateErr := c.Status().Update(context.TODO(), workCopy) - if updateErr == nil { - return nil - } - - updated := &workv1alpha1.Work{} - if err = c.Get(context.TODO(), client.ObjectKey{Namespace: workCopy.Namespace, Name: workCopy.Name}, updated); err == nil { - workCopy = updated - } else { - klog.Errorf("Failed to get updated work %s/%s: %v", workCopy.Namespace, workCopy.Name, err) - } - return updateErr + }) + return err }) } diff --git a/pkg/util/helper/status.go b/pkg/util/helper/status.go new file mode 100755 index 000000000000..2a41df6f81af --- /dev/null +++ b/pkg/util/helper/status.go @@ -0,0 +1,69 @@ +/* +Copyright 2024 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package helper + +import ( + "context" + "fmt" + + "k8s.io/apimachinery/pkg/api/equality" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +// UpdateStatus updates the given object's status in the Kubernetes +// cluster. The object's desired state must be reconciled with the existing +// state inside the passed in callback MutateFn. +// +// The MutateFn is called when updating an object's status. +// +// It returns the executed operation and an error. +// +// Note: changes to any sub-resource other than status will be ignored. +// Changes to the status sub-resource will only be applied if the object +// already exist. +func UpdateStatus(ctx context.Context, c client.Client, obj client.Object, f controllerutil.MutateFn) (controllerutil.OperationResult, error) { + key := client.ObjectKeyFromObject(obj) + if err := c.Get(ctx, key, obj); err != nil { + return controllerutil.OperationResultNone, err + } + + existing := obj.DeepCopyObject() + if err := mutate(f, key, obj); err != nil { + return controllerutil.OperationResultNone, err + } + + if equality.Semantic.DeepEqual(existing, obj) { + return controllerutil.OperationResultNone, nil + } + + if err := c.Status().Update(ctx, obj); err != nil { + return controllerutil.OperationResultNone, err + } + return controllerutil.OperationResultUpdatedStatusOnly, nil +} + +// mutate wraps a MutateFn and applies validation to its result. +func mutate(f controllerutil.MutateFn, key client.ObjectKey, obj client.Object) error { + if err := f(); err != nil { + return err + } + if newKey := client.ObjectKeyFromObject(obj); key != newKey { + return fmt.Errorf("MutateFn cannot mutate object name and/or object namespace") + } + return nil +} diff --git a/pkg/util/helper/workstatus.go b/pkg/util/helper/workstatus.go index ca40aef2cdd9..8f3a275f5bf8 100644 --- a/pkg/util/helper/workstatus.go +++ b/pkg/util/helper/workstatus.go @@ -20,7 +20,6 @@ import ( "context" "encoding/json" "fmt" - "reflect" "sort" corev1 "k8s.io/api/core/v1" @@ -34,6 +33,7 @@ import ( "k8s.io/client-go/util/retry" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" @@ -73,41 +73,28 @@ func AggregateResourceBindingWorkStatus( fullyAppliedCondition := generateFullyAppliedCondition(binding.Spec, aggregatedStatuses) - err = retry.RetryOnConflict(retry.DefaultRetry, func() (err error) { - currentBindingStatus := binding.Status.DeepCopy() - - binding.Status.AggregatedStatus = aggregatedStatuses - // set binding status with the newest condition - meta.SetStatusCondition(&binding.Status.Conditions, fullyAppliedCondition) - if reflect.DeepEqual(binding.Status, *currentBindingStatus) { - klog.V(4).Infof("New aggregatedStatuses are equal with old resourceBinding(%s/%s) AggregatedStatus, no update required.", - binding.Namespace, binding.Name) - return nil - } - - updateErr := c.Status().Update(context.TODO(), binding) - if updateErr == nil { + var operationResult controllerutil.OperationResult + if err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + operationResult, err = UpdateStatus(context.Background(), c, binding, func() error { + binding.Status.AggregatedStatus = aggregatedStatuses + // set binding status with the newest condition + meta.SetStatusCondition(&binding.Status.Conditions, fullyAppliedCondition) return nil - } - - updated := &workv1alpha2.ResourceBinding{} - if err = c.Get(context.TODO(), client.ObjectKey{Namespace: binding.Namespace, Name: binding.Name}, updated); err == nil { - binding = updated - } else { - klog.Errorf("Failed to get updated binding %s/%s: %v", binding.Namespace, binding.Name, err) - } - - return updateErr - }) - if err != nil { + }) + return err + }); err != nil { eventRecorder.Event(binding, corev1.EventTypeWarning, events.EventReasonAggregateStatusFailed, err.Error()) eventRecorder.Event(resourceTemplate, corev1.EventTypeWarning, events.EventReasonAggregateStatusFailed, err.Error()) return err } - msg := fmt.Sprintf("Update resourceBinding(%s/%s) with AggregatedStatus successfully.", binding.Namespace, binding.Name) - eventRecorder.Event(binding, corev1.EventTypeNormal, events.EventReasonAggregateStatusSucceed, msg) - eventRecorder.Event(resourceTemplate, corev1.EventTypeNormal, events.EventReasonAggregateStatusSucceed, msg) + if operationResult == controllerutil.OperationResultUpdatedStatusOnly { + msg := fmt.Sprintf("Update ResourceBinding(%s/%s) with AggregatedStatus successfully.", binding.Namespace, binding.Name) + eventRecorder.Event(binding, corev1.EventTypeNormal, events.EventReasonAggregateStatusSucceed, msg) + eventRecorder.Event(resourceTemplate, corev1.EventTypeNormal, events.EventReasonAggregateStatusSucceed, msg) + } else { + klog.Infof("New aggregatedStatuses are equal with old ResourceBinding(%s/%s) AggregatedStatus, no update required.", binding.Namespace, binding.Name) + } return nil } @@ -131,40 +118,29 @@ func AggregateClusterResourceBindingWorkStatus( fullyAppliedCondition := generateFullyAppliedCondition(binding.Spec, aggregatedStatuses) - err = retry.RetryOnConflict(retry.DefaultRetry, func() (err error) { - currentBindingStatus := binding.Status.DeepCopy() - - binding.Status.AggregatedStatus = aggregatedStatuses - // set binding status with the newest condition - meta.SetStatusCondition(&binding.Status.Conditions, fullyAppliedCondition) - if reflect.DeepEqual(binding.Status, *currentBindingStatus) { - klog.Infof("New aggregatedStatuses are equal with old clusterResourceBinding(%s) AggregatedStatus, no update required.", binding.Name) + var operationResult controllerutil.OperationResult + if err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + operationResult, err = UpdateStatus(context.Background(), c, binding, func() error { + binding.Status.AggregatedStatus = aggregatedStatuses + // set binding status with the newest condition + meta.SetStatusCondition(&binding.Status.Conditions, fullyAppliedCondition) return nil - } - - updateErr := c.Status().Update(context.TODO(), binding) - if updateErr == nil { - return nil - } - - updated := &workv1alpha2.ClusterResourceBinding{} - if err = c.Get(context.TODO(), client.ObjectKey{Name: binding.Name}, updated); err == nil { - binding = updated - } else { - klog.Errorf("Failed to get updated binding %s/%s: %v", binding.Namespace, binding.Name, err) - } - - return updateErr - }) - if err != nil { + }) + return err + }); err != nil { eventRecorder.Event(binding, corev1.EventTypeWarning, events.EventReasonAggregateStatusFailed, err.Error()) eventRecorder.Event(resourceTemplate, corev1.EventTypeWarning, events.EventReasonAggregateStatusFailed, err.Error()) return err } - msg := fmt.Sprintf("Update clusterResourceBinding(%s) with AggregatedStatus successfully.", binding.Name) - eventRecorder.Event(binding, corev1.EventTypeNormal, events.EventReasonAggregateStatusSucceed, msg) - eventRecorder.Event(resourceTemplate, corev1.EventTypeNormal, events.EventReasonAggregateStatusSucceed, msg) + if operationResult == controllerutil.OperationResultUpdatedStatusOnly { + msg := fmt.Sprintf("Update ClusterResourceBinding(%s) with AggregatedStatus successfully.", binding.Name) + eventRecorder.Event(binding, corev1.EventTypeNormal, events.EventReasonAggregateStatusSucceed, msg) + eventRecorder.Event(resourceTemplate, corev1.EventTypeNormal, events.EventReasonAggregateStatusSucceed, msg) + } else { + klog.Infof("New aggregatedStatuses are equal with old ClusterResourceBinding(%s) AggregatedStatus, no update required.", binding.Name) + } + return nil }