Skip to content

Commit

Permalink
Integrate UpdateStatus function
Browse files Browse the repository at this point in the history
Signed-off-by: whitewindmills <jayfantasyhjh@gmail.com>
  • Loading branch information
whitewindmills committed Jun 5, 2024
1 parent 2220124 commit 94bce30
Show file tree
Hide file tree
Showing 11 changed files with 223 additions and 267 deletions.
157 changes: 76 additions & 81 deletions pkg/controllers/cronfederatedhpa/cronfederatedhpa_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

autoscalingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/autoscaling/v1alpha1"
"github.com/karmada-io/karmada/pkg/metrics"
Expand Down Expand Up @@ -226,105 +227,99 @@ func (c *ScalingJob) addFailedExecutionHistory(
cronFHPA *autoscalingv1alpha1.CronFederatedHPA, errMsg string) error {
_, nextExecutionTime := c.scheduler.NextRun()

// Add success history record, return false if there is no such rule
addFailedHistoryFunc := func() bool {
exists := false
for index, rule := range cronFHPA.Status.ExecutionHistories {
if rule.RuleName != c.rule.Name {
continue
}
failedExecution := autoscalingv1alpha1.FailedExecution{
ScheduleTime: rule.NextExecutionTime,
ExecutionTime: &metav1.Time{Time: time.Now()},
Message: errMsg,
}
historyLimits := helper.GetCronFederatedHPAFailedHistoryLimits(c.rule)
if len(rule.FailedExecutions) > historyLimits-1 {
rule.FailedExecutions = rule.FailedExecutions[:historyLimits-1]
}
cronFHPA.Status.ExecutionHistories[index].FailedExecutions =
append([]autoscalingv1alpha1.FailedExecution{failedExecution}, rule.FailedExecutions...)
cronFHPA.Status.ExecutionHistories[index].NextExecutionTime = &metav1.Time{Time: nextExecutionTime}
exists = true
break
// Add failed history record
addFailedHistoryFunc := func(index int) {
failedExecution := autoscalingv1alpha1.FailedExecution{
ScheduleTime: cronFHPA.Status.ExecutionHistories[index].NextExecutionTime,
ExecutionTime: &metav1.Time{Time: time.Now()},
Message: errMsg,
}

return exists
historyLimits := helper.GetCronFederatedHPAFailedHistoryLimits(c.rule)
if len(cronFHPA.Status.ExecutionHistories[index].FailedExecutions) > historyLimits-1 {
cronFHPA.Status.ExecutionHistories[index].FailedExecutions = cronFHPA.Status.ExecutionHistories[index].FailedExecutions[:historyLimits-1]
}
cronFHPA.Status.ExecutionHistories[index].FailedExecutions =
append([]autoscalingv1alpha1.FailedExecution{failedExecution}, cronFHPA.Status.ExecutionHistories[index].FailedExecutions...)
cronFHPA.Status.ExecutionHistories[index].NextExecutionTime = &metav1.Time{Time: nextExecutionTime}
}

return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
// If this history not exist, it means the rule is suspended or deleted, so just ignore it.
if exists := addFailedHistoryFunc(); !exists {
var operationResult controllerutil.OperationResult
if err := retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
operationResult, err = helper.UpdateStatus(context.Background(), c.client, cronFHPA, func() error {
index := c.findExecutionHistory(cronFHPA.Status.ExecutionHistories)
if index < 0 {
// The failed history does not exist, it means the rule deleted, so just ignore it.
return nil
}
addFailedHistoryFunc(index)
return nil
}
})
return err
}); err != nil {
klog.Errorf("Failed to add failed history record to CronFederatedHPA(%s/%s): %v", cronFHPA.Namespace, cronFHPA.Name, err)
return err
}

updateErr := c.client.Status().Update(context.Background(), cronFHPA)
if updateErr == nil {
klog.V(4).Infof("CronFederatedHPA(%s/%s) status has been updated successfully", cronFHPA.Namespace, cronFHPA.Name)
return nil
}
if operationResult == controllerutil.OperationResultUpdatedStatusOnly {
klog.V(4).Infof("CronFederatedHPA(%s/%s) status has been updated successfully", cronFHPA.Namespace, cronFHPA.Name)
}

updated := &autoscalingv1alpha1.CronFederatedHPA{}
if err = c.client.Get(context.Background(), client.ObjectKey{Namespace: cronFHPA.Namespace, Name: cronFHPA.Name}, updated); err == nil {
cronFHPA = updated
} else {
klog.Errorf("Get CronFederatedHPA(%s/%s) failed: %v", cronFHPA.Namespace, cronFHPA.Name, err)
return nil
}

// findExecutionHistory finds the history record, returns -1 if there is no such rule.
func (c *ScalingJob) findExecutionHistory(histories []autoscalingv1alpha1.ExecutionHistory) int {
for index, rule := range histories {
if rule.RuleName == c.rule.Name {
return index
}
return updateErr
})
}
return -1
}

func (c *ScalingJob) addSuccessExecutionHistory(
cronFHPA *autoscalingv1alpha1.CronFederatedHPA,
appliedReplicas, appliedMinReplicas, appliedMaxReplicas *int32) error {
_, nextExecutionTime := c.scheduler.NextRun()

// Add success history record, return false if there is no such rule
addSuccessHistoryFunc := func() bool {
exists := false
for index, rule := range cronFHPA.Status.ExecutionHistories {
if rule.RuleName != c.rule.Name {
continue
}
successExecution := autoscalingv1alpha1.SuccessfulExecution{
ScheduleTime: rule.NextExecutionTime,
ExecutionTime: &metav1.Time{Time: time.Now()},
AppliedReplicas: appliedReplicas,
AppliedMaxReplicas: appliedMaxReplicas,
AppliedMinReplicas: appliedMinReplicas,
}
historyLimits := helper.GetCronFederatedHPASuccessHistoryLimits(c.rule)
if len(rule.SuccessfulExecutions) > historyLimits-1 {
rule.SuccessfulExecutions = rule.SuccessfulExecutions[:historyLimits-1]
}
cronFHPA.Status.ExecutionHistories[index].SuccessfulExecutions =
append([]autoscalingv1alpha1.SuccessfulExecution{successExecution}, rule.SuccessfulExecutions...)
cronFHPA.Status.ExecutionHistories[index].NextExecutionTime = &metav1.Time{Time: nextExecutionTime}
exists = true
break
// Add success history record
addSuccessHistoryFunc := func(index int) {
successExecution := autoscalingv1alpha1.SuccessfulExecution{
ScheduleTime: cronFHPA.Status.ExecutionHistories[index].NextExecutionTime,
ExecutionTime: &metav1.Time{Time: time.Now()},
AppliedReplicas: appliedReplicas,
AppliedMaxReplicas: appliedMaxReplicas,
AppliedMinReplicas: appliedMinReplicas,
}

return exists
historyLimits := helper.GetCronFederatedHPASuccessHistoryLimits(c.rule)
if len(cronFHPA.Status.ExecutionHistories[index].SuccessfulExecutions) > historyLimits-1 {
cronFHPA.Status.ExecutionHistories[index].SuccessfulExecutions = cronFHPA.Status.ExecutionHistories[index].SuccessfulExecutions[:historyLimits-1]
}
cronFHPA.Status.ExecutionHistories[index].SuccessfulExecutions =
append([]autoscalingv1alpha1.SuccessfulExecution{successExecution}, cronFHPA.Status.ExecutionHistories[index].SuccessfulExecutions...)
cronFHPA.Status.ExecutionHistories[index].NextExecutionTime = &metav1.Time{Time: nextExecutionTime}
}

return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
// If this history not exist, it means the rule deleted, so just ignore it.
if exists := addSuccessHistoryFunc(); !exists {
var operationResult controllerutil.OperationResult
if err := retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
operationResult, err = helper.UpdateStatus(context.Background(), c.client, cronFHPA, func() error {
index := c.findExecutionHistory(cronFHPA.Status.ExecutionHistories)
if index < 0 {
// The success history does not exist, it means the rule deleted, so just ignore it.
return nil
}
addSuccessHistoryFunc(index)
return nil
}
})
return err
}); err != nil {
klog.Errorf("Failed to add success history record to CronFederatedHPA(%s/%s): %v", cronFHPA.Namespace, cronFHPA.Name, err)
return err
}

updateErr := c.client.Status().Update(context.Background(), cronFHPA)
if updateErr == nil {
klog.V(4).Infof("CronFederatedHPA(%s/%s) status has been updated successfully", cronFHPA.Namespace, cronFHPA.Name)
return err
}
if operationResult == controllerutil.OperationResultUpdatedStatusOnly {
klog.V(4).Infof("CronFederatedHPA(%s/%s) status has been updated successfully", cronFHPA.Namespace, cronFHPA.Name)
}

updated := &autoscalingv1alpha1.CronFederatedHPA{}
if err = c.client.Get(context.Background(), client.ObjectKey{Namespace: cronFHPA.Namespace, Name: cronFHPA.Name}, updated); err == nil {
cronFHPA = updated
} else {
klog.Errorf("Get CronFederatedHPA(%s/%s) failed: %v", cronFHPA.Namespace, cronFHPA.Name, err)
}
return updateErr
})
return nil
}
15 changes: 4 additions & 11 deletions pkg/controllers/execution/execution_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,18 +258,11 @@ func (c *Controller) updateAppliedCondition(work *workv1alpha1.Work, status meta
}

return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
meta.SetStatusCondition(&work.Status.Conditions, newWorkAppliedCondition)
updateErr := c.Status().Update(context.TODO(), work)
if updateErr == nil {
_, err = helper.UpdateStatus(context.Background(), c.Client, work, func() error {
meta.SetStatusCondition(&work.Status.Conditions, newWorkAppliedCondition)
return nil
}
updated := &workv1alpha1.Work{}
if err = c.Get(context.TODO(), client.ObjectKey{Namespace: work.Namespace, Name: work.Name}, updated); err == nil {
work = updated
} else {
klog.Errorf("Failed to get the updated work(%s/%s), err: %v", work.Namespace, work.Name, err)
}
return updateErr
})
return err
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,20 +160,11 @@ func (c *StatusController) collectQuotaStatus(quota *policyv1alpha1.FederatedRes
}

return retry.RetryOnConflict(retry.DefaultRetry, func() error {
quota.Status = *quotaStatus
updateErr := c.Status().Update(context.TODO(), quota)
if updateErr == nil {
_, err = helper.UpdateStatus(context.Background(), c.Client, quota, func() error {
quota.Status = *quotaStatus
return nil
}

updated := &policyv1alpha1.FederatedResourceQuota{}
if err = c.Get(context.TODO(), client.ObjectKey{Namespace: quota.Namespace, Name: quota.Name}, updated); err == nil {
quota = updated
} else {
klog.Errorf("Failed to get updated federatedResourceQuota(%s): %v", klog.KObj(quota).String(), err)
}

return updateErr
})
return err
})
}

Expand Down
26 changes: 9 additions & 17 deletions pkg/controllers/mcs/service_import_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/names"
)

Expand Down Expand Up @@ -153,24 +154,15 @@ func (c *ServiceImportController) updateServiceStatus(svcImport *mcsv1alpha1.Ser
}

err := retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
derivedService.Status = corev1.ServiceStatus{
LoadBalancer: corev1.LoadBalancerStatus{
Ingress: ingress,
},
}
updateErr := c.Status().Update(context.TODO(), derivedService)
if updateErr == nil {
_, err = helper.UpdateStatus(context.Background(), c.Client, derivedService, func() error {
derivedService.Status = corev1.ServiceStatus{
LoadBalancer: corev1.LoadBalancerStatus{
Ingress: ingress,
},
}
return nil
}

updated := &corev1.Service{}
if err = c.Get(context.TODO(), client.ObjectKey{Namespace: derivedService.Namespace, Name: derivedService.Name}, updated); err == nil {
derivedService = updated
} else {
klog.Errorf("Failed to get updated service %s/%s: %v", derivedService.Namespace, derivedService.Name, err)
}

return updateErr
})
return err
})

if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,11 @@ func (c *EndpointsliceDispatchController) updateEndpointSliceDispatched(mcs *net
}

return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
meta.SetStatusCondition(&mcs.Status.Conditions, EndpointSliceCollected)
updateErr := c.Status().Update(context.TODO(), mcs)
if updateErr == nil {
_, err = helper.UpdateStatus(context.Background(), c.Client, mcs, func() error {
meta.SetStatusCondition(&mcs.Status.Conditions, EndpointSliceCollected)
return nil
}
updated := &networkingv1alpha1.MultiClusterService{}
if err = c.Get(context.TODO(), client.ObjectKey{Namespace: mcs.Namespace, Name: mcs.Name}, updated); err == nil {
mcs = updated
} else {
klog.Errorf("Failed to get updated MultiClusterService %s/%s: %v", mcs.Namespace, mcs.Name, err)
}
return updateErr
})
return err
})
}

Expand Down
15 changes: 4 additions & 11 deletions pkg/controllers/multiclusterservice/mcs_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,18 +525,11 @@ func (c *MCSController) updateMultiClusterServiceStatus(mcs *networkingv1alpha1.
}

return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
meta.SetStatusCondition(&mcs.Status.Conditions, serviceAppliedCondition)
updateErr := c.Status().Update(context.TODO(), mcs)
if updateErr == nil {
_, err = helper.UpdateStatus(context.Background(), c.Client, mcs, func() error {
meta.SetStatusCondition(&mcs.Status.Conditions, serviceAppliedCondition)
return nil
}
updated := &networkingv1alpha1.MultiClusterService{}
if err = c.Get(context.TODO(), client.ObjectKey{Namespace: mcs.Namespace, Name: mcs.Name}, updated); err == nil {
mcs = updated
} else {
klog.Errorf("Failed to get updated MultiClusterService %s/%s: %v", mcs.Namespace, mcs.Name, err)
}
return updateErr
})
return err
})
}

Expand Down
28 changes: 7 additions & 21 deletions pkg/controllers/remediation/remedy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ package remediation

import (
"context"
"reflect"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
Expand All @@ -33,6 +31,7 @@ import (
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
remedyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/remedy/v1alpha1"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util/helper"
)

// ControllerName is the controller name that will be used when reporting events.
Expand Down Expand Up @@ -70,26 +69,13 @@ func (c *RemedyController) Reconcile(ctx context.Context, req controllerruntime.
}

actions := calculateActions(clusterRelatedRemedies, cluster)
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
if reflect.DeepEqual(actions, cluster.Status.RemedyActions) {
if err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
_, err = helper.UpdateStatus(context.Background(), c.Client, cluster, func() error {
cluster.Status.RemedyActions = actions
return nil
}
cluster.Status.RemedyActions = actions
updateErr := c.Client.Status().Update(ctx, cluster)
if updateErr == nil {
return nil
}

updatedCluster := &clusterv1alpha1.Cluster{}
err = c.Client.Get(ctx, types.NamespacedName{Name: cluster.Name}, updatedCluster)
if err == nil {
cluster = updatedCluster
} else {
klog.Errorf("Failed to get updated cluster(%s): %v", cluster.Name, err)
}
return updateErr
})
if err != nil {
})
return err
}); err != nil {
klog.Errorf("Failed to sync cluster(%s) remedy actions: %v", cluster.Name, err)
return controllerruntime.Result{}, err
}
Expand Down
16 changes: 4 additions & 12 deletions pkg/controllers/status/cluster_status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,19 +277,11 @@ func (c *ClusterStatusController) updateStatusIfNeeded(cluster *clusterv1alpha1.
if !equality.Semantic.DeepEqual(cluster.Status, currentClusterStatus) {
klog.V(4).Infof("Start to update cluster status: %s", cluster.Name)
err := retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
cluster.Status = currentClusterStatus
updateErr := c.Status().Update(context.TODO(), cluster)
if updateErr == nil {
_, err = helper.UpdateStatus(context.Background(), c.Client, cluster, func() error {
cluster.Status = currentClusterStatus
return nil
}

updated := &clusterv1alpha1.Cluster{}
if err = c.Get(context.TODO(), client.ObjectKey{Namespace: cluster.Namespace, Name: cluster.Name}, updated); err == nil {
cluster = updated
} else {
klog.Errorf("Failed to get updated cluster %s: %v", cluster.Name, err)
}
return updateErr
})
return err
})
if err != nil {
klog.Errorf("Failed to update health status of the member cluster: %v, err is : %v", cluster.Name, err)
Expand Down
Loading

0 comments on commit 94bce30

Please sign in to comment.