diff --git a/bootstrap/api/v1beta2/ck8sconfig_types.go b/bootstrap/api/v1beta2/ck8sconfig_types.go index 6498c56f..5d1c4a9d 100644 --- a/bootstrap/api/v1beta2/ck8sconfig_types.go +++ b/bootstrap/api/v1beta2/ck8sconfig_types.go @@ -51,6 +51,21 @@ type CK8sConfigSpec struct { // +optional AirGapped bool `json:"airGapped,omitempty"` + // The snap store proxy domain's scheme, e.g. "http" or "https" without "://" + // Defaults to "http". + // +optional + // +kubebuilder:default=http + // +kubebuilder:validation:Enum=http;https + SnapstoreProxyScheme string `json:"snapstoreProxyScheme,omitempty"` + + // The snap store proxy domain + // +optional + SnapstoreProxyDomain string `json:"snapstoreProxyDomain,omitempty"` + + // The snap store proxy ID + // +optional + SnapstoreProxyID string `json:"snapstoreProxyId,omitempty"` + // CK8sControlPlaneConfig is configuration for the control plane node. // +optional ControlPlaneConfig CK8sControlPlaneConfig `json:"controlPlane,omitempty"` diff --git a/bootstrap/api/v1beta2/inplace_upgrade_consts.go b/bootstrap/api/v1beta2/inplace_upgrade_consts.go index 213f0fbe..444d243d 100644 --- a/bootstrap/api/v1beta2/inplace_upgrade_consts.go +++ b/bootstrap/api/v1beta2/inplace_upgrade_consts.go @@ -1,10 +1,11 @@ package v1beta2 const ( - InPlaceUpgradeToAnnotation = "v1beta2.k8sd.io/in-place-upgrade-to" - InPlaceUpgradeStatusAnnotation = "v1beta2.k8sd.io/in-place-upgrade-status" - InPlaceUpgradeReleaseAnnotation = "v1beta2.k8sd.io/in-place-upgrade-release" - InPlaceUpgradeChangeIDAnnotation = "v1beta2.k8sd.io/in-place-upgrade-change-id" + InPlaceUpgradeToAnnotation = "v1beta2.k8sd.io/in-place-upgrade-to" + InPlaceUpgradeStatusAnnotation = "v1beta2.k8sd.io/in-place-upgrade-status" + InPlaceUpgradeReleaseAnnotation = "v1beta2.k8sd.io/in-place-upgrade-release" + InPlaceUpgradeChangeIDAnnotation = "v1beta2.k8sd.io/in-place-upgrade-change-id" + InPlaceUpgradeLastFailedAttemptAtAnnotation = "v1beta2.k8sd.io/in-place-upgrade-last-failed-attempt-at" ) const ( @@ -17,4 +18,5 @@ const ( InPlaceUpgradeInProgressEvent = "InPlaceUpgradeInProgress" InPlaceUpgradeDoneEvent = "InPlaceUpgradeDone" InPlaceUpgradeFailedEvent = "InPlaceUpgradeFailed" + InPlaceUpgradeCancelledEvent = "InPlaceUpgradeCancelled" ) diff --git a/bootstrap/config/crd/bases/bootstrap.cluster.x-k8s.io_ck8sconfigs.yaml b/bootstrap/config/crd/bases/bootstrap.cluster.x-k8s.io_ck8sconfigs.yaml index c3fef462..d468a1d1 100644 --- a/bootstrap/config/crd/bases/bootstrap.cluster.x-k8s.io_ck8sconfigs.yaml +++ b/bootstrap/config/crd/bases/bootstrap.cluster.x-k8s.io_ck8sconfigs.yaml @@ -210,6 +210,21 @@ spec: items: type: string type: array + snapstoreProxyDomain: + description: The snap store proxy domain + type: string + snapstoreProxyId: + description: The snap store proxy ID + type: string + snapstoreProxyScheme: + default: http + description: |- + The snap store proxy domain's scheme, e.g. "http" or "https" without "://" + Defaults to "http". + enum: + - http + - https + type: string version: description: Version specifies the Kubernetes version. type: string diff --git a/bootstrap/config/crd/bases/bootstrap.cluster.x-k8s.io_ck8sconfigtemplates.yaml b/bootstrap/config/crd/bases/bootstrap.cluster.x-k8s.io_ck8sconfigtemplates.yaml index 15d63107..1ee229cc 100644 --- a/bootstrap/config/crd/bases/bootstrap.cluster.x-k8s.io_ck8sconfigtemplates.yaml +++ b/bootstrap/config/crd/bases/bootstrap.cluster.x-k8s.io_ck8sconfigtemplates.yaml @@ -219,6 +219,21 @@ spec: items: type: string type: array + snapstoreProxyDomain: + description: The snap store proxy domain + type: string + snapstoreProxyId: + description: The snap store proxy ID + type: string + snapstoreProxyScheme: + default: http + description: |- + The snap store proxy domain's scheme, e.g. "http" or "https" without "://" + Defaults to "http". + enum: + - http + - https + type: string version: description: Version specifies the Kubernetes version. type: string diff --git a/bootstrap/config/rbac/role.yaml b/bootstrap/config/rbac/role.yaml index 50e1603a..0f53d963 100644 --- a/bootstrap/config/rbac/role.yaml +++ b/bootstrap/config/rbac/role.yaml @@ -62,6 +62,19 @@ rules: - get - list - watch +- apiGroups: + - cluster.x-k8s.io + resources: + - machinedeployments + - machinedeployments/status + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - cluster.x-k8s.io resources: @@ -75,6 +88,15 @@ rules: - patch - update - watch +- apiGroups: + - cluster.x-k8s.io + resources: + - machinesets + - machinesets/status + verbs: + - get + - list + - watch - apiGroups: - exp.cluster.x-k8s.io resources: diff --git a/bootstrap/controllers/certificates_controller.go b/bootstrap/controllers/certificates_controller.go index 6e313a60..3ec9164b 100644 --- a/bootstrap/controllers/certificates_controller.go +++ b/bootstrap/controllers/certificates_controller.go @@ -233,6 +233,9 @@ func (r *CertificatesReconciler) updateExpiryDateAnnotation(ctx context.Context, } mAnnotations := scope.Machine.GetAnnotations() + if mAnnotations == nil { + mAnnotations = map[string]string{} + } expiryDateString, err := scope.Workload.GetCertificatesExpiryDate(ctx, scope.Machine, *nodeToken) if err != nil { diff --git a/bootstrap/controllers/ck8sconfig_controller.go b/bootstrap/controllers/ck8sconfig_controller.go index 79539352..c0d694f0 100644 --- a/bootstrap/controllers/ck8sconfig_controller.go +++ b/bootstrap/controllers/ck8sconfig_controller.go @@ -258,19 +258,25 @@ func (r *CK8sConfigReconciler) joinControlplane(ctx context.Context, scope *Scop return err } + snapInstallData := r.resolveInPlaceUpgradeRelease(machine) + input := cloudinit.JoinControlPlaneInput{ BaseUserData: cloudinit.BaseUserData{ - BootCommands: scope.Config.Spec.BootCommands, - PreRunCommands: scope.Config.Spec.PreRunCommands, - PostRunCommands: scope.Config.Spec.PostRunCommands, - KubernetesVersion: scope.Config.Spec.Version, - ExtraFiles: cloudinit.FilesFromAPI(files), - ConfigFileContents: string(joinConfig), - MicroclusterAddress: scope.Config.Spec.ControlPlaneConfig.MicroclusterAddress, - MicroclusterPort: microclusterPort, - AirGapped: scope.Config.Spec.AirGapped, - NodeName: scope.Config.Spec.NodeName, - NodeToken: *nodeToken, + BootCommands: scope.Config.Spec.BootCommands, + PreRunCommands: scope.Config.Spec.PreRunCommands, + PostRunCommands: scope.Config.Spec.PostRunCommands, + KubernetesVersion: scope.Config.Spec.Version, + SnapInstallData: snapInstallData, + ExtraFiles: cloudinit.FilesFromAPI(files), + ConfigFileContents: string(joinConfig), + MicroclusterAddress: scope.Config.Spec.ControlPlaneConfig.MicroclusterAddress, + MicroclusterPort: microclusterPort, + AirGapped: scope.Config.Spec.AirGapped, + SnapstoreProxyScheme: scope.Config.Spec.SnapstoreProxyScheme, + SnapstoreProxyDomain: scope.Config.Spec.SnapstoreProxyDomain, + SnapstoreProxyID: scope.Config.Spec.SnapstoreProxyID, + NodeName: scope.Config.Spec.NodeName, + NodeToken: *nodeToken, }, JoinToken: joinToken, } @@ -337,19 +343,25 @@ func (r *CK8sConfigReconciler) joinWorker(ctx context.Context, scope *Scope) err return err } + snapInstallData := r.resolveInPlaceUpgradeRelease(machine) + input := cloudinit.JoinWorkerInput{ BaseUserData: cloudinit.BaseUserData{ - BootCommands: scope.Config.Spec.BootCommands, - PreRunCommands: scope.Config.Spec.PreRunCommands, - PostRunCommands: scope.Config.Spec.PostRunCommands, - KubernetesVersion: scope.Config.Spec.Version, - ExtraFiles: cloudinit.FilesFromAPI(files), - ConfigFileContents: string(joinConfig), - MicroclusterAddress: scope.Config.Spec.ControlPlaneConfig.MicroclusterAddress, - MicroclusterPort: microclusterPort, - AirGapped: scope.Config.Spec.AirGapped, - NodeName: scope.Config.Spec.NodeName, - NodeToken: *nodeToken, + BootCommands: scope.Config.Spec.BootCommands, + PreRunCommands: scope.Config.Spec.PreRunCommands, + PostRunCommands: scope.Config.Spec.PostRunCommands, + KubernetesVersion: scope.Config.Spec.Version, + SnapInstallData: snapInstallData, + ExtraFiles: cloudinit.FilesFromAPI(files), + ConfigFileContents: string(joinConfig), + MicroclusterAddress: scope.Config.Spec.ControlPlaneConfig.MicroclusterAddress, + MicroclusterPort: microclusterPort, + AirGapped: scope.Config.Spec.AirGapped, + SnapstoreProxyScheme: scope.Config.Spec.SnapstoreProxyScheme, + SnapstoreProxyDomain: scope.Config.Spec.SnapstoreProxyDomain, + SnapstoreProxyID: scope.Config.Spec.SnapstoreProxyID, + NodeName: scope.Config.Spec.NodeName, + NodeToken: *nodeToken, }, JoinToken: joinToken, } @@ -391,6 +403,41 @@ func (r *CK8sConfigReconciler) resolveFiles(ctx context.Context, cfg *bootstrapv return collected, nil } +func (r *CK8sConfigReconciler) resolveInPlaceUpgradeRelease(machine *clusterv1.Machine) cloudinit.SnapInstallData { + mAnnotations := machine.GetAnnotations() + + if mAnnotations != nil { + return cloudinit.SnapInstallData{} + } + + val, ok := mAnnotations[bootstrapv1.InPlaceUpgradeReleaseAnnotation] + if ok { + optionKv := strings.Split(val, "=") + + switch optionKv[0] { + case "channel": + return cloudinit.SnapInstallData{ + Option: cloudinit.InstallOptionChannel, + Value: optionKv[1], + } + case "revision": + return cloudinit.SnapInstallData{ + Option: cloudinit.InstallOptionRevision, + Value: optionKv[1], + } + case "localPath": + return cloudinit.SnapInstallData{ + Option: cloudinit.InstallOptionLocalPath, + Value: optionKv[1], + } + default: + r.Log.Info("Unknown in-place upgrade release option, ignoring", "option", optionKv[0]) + } + } + + return cloudinit.SnapInstallData{} +} + // resolveSecretFileContent returns file content fetched from a referenced secret object. func (r *CK8sConfigReconciler) resolveSecretFileContent(ctx context.Context, ns string, source bootstrapv1.File) ([]byte, error) { secret := &corev1.Secret{} @@ -529,19 +576,25 @@ func (r *CK8sConfigReconciler) handleClusterNotInitialized(ctx context.Context, return ctrl.Result{}, fmt.Errorf("failed to render k8sd-proxy daemonset: %w", err) } + snapInstallData := r.resolveInPlaceUpgradeRelease(machine) + cpinput := cloudinit.InitControlPlaneInput{ BaseUserData: cloudinit.BaseUserData{ - BootCommands: scope.Config.Spec.BootCommands, - PreRunCommands: scope.Config.Spec.PreRunCommands, - PostRunCommands: scope.Config.Spec.PostRunCommands, - KubernetesVersion: scope.Config.Spec.Version, - ExtraFiles: cloudinit.FilesFromAPI(files), - ConfigFileContents: string(initConfig), - MicroclusterAddress: scope.Config.Spec.ControlPlaneConfig.MicroclusterAddress, - MicroclusterPort: microclusterPort, - NodeName: scope.Config.Spec.NodeName, - AirGapped: scope.Config.Spec.AirGapped, - NodeToken: *nodeToken, + BootCommands: scope.Config.Spec.BootCommands, + PreRunCommands: scope.Config.Spec.PreRunCommands, + PostRunCommands: scope.Config.Spec.PostRunCommands, + KubernetesVersion: scope.Config.Spec.Version, + SnapInstallData: snapInstallData, + ExtraFiles: cloudinit.FilesFromAPI(files), + ConfigFileContents: string(initConfig), + MicroclusterAddress: scope.Config.Spec.ControlPlaneConfig.MicroclusterAddress, + MicroclusterPort: microclusterPort, + NodeName: scope.Config.Spec.NodeName, + AirGapped: scope.Config.Spec.AirGapped, + SnapstoreProxyScheme: scope.Config.Spec.SnapstoreProxyScheme, + SnapstoreProxyDomain: scope.Config.Spec.SnapstoreProxyDomain, + SnapstoreProxyID: scope.Config.Spec.SnapstoreProxyID, + NodeToken: *nodeToken, }, AuthToken: *authToken, K8sdProxyDaemonSet: string(ds), diff --git a/bootstrap/controllers/machine_deployment_controller.go b/bootstrap/controllers/machine_deployment_controller.go new file mode 100644 index 00000000..3aeee66a --- /dev/null +++ b/bootstrap/controllers/machine_deployment_controller.go @@ -0,0 +1,414 @@ +package controllers + +import ( + "context" + "fmt" + "slices" + "time" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + "sigs.k8s.io/cluster-api/util" + "sigs.k8s.io/cluster-api/util/collections" + "sigs.k8s.io/cluster-api/util/patch" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + bootstrapv1 "github.com/canonical/cluster-api-k8s/bootstrap/api/v1beta2" + "github.com/canonical/cluster-api-k8s/pkg/ck8s" + "github.com/canonical/cluster-api-k8s/pkg/trace" +) + +// MachineGetter is an interface that defines the methods a MachineDeploymentReconciler uses to get machines. +type MachineGetter interface { + GetMachinesForCluster(ctx context.Context, cluster client.ObjectKey, filters ...collections.Func) (collections.Machines, error) +} + +// MachineDeploymentReconciler reconciles a MachineDeployment object and manages the in-place upgrades. +type MachineDeploymentReconciler struct { + scheme *runtime.Scheme + recorder record.EventRecorder + machineGetter MachineGetter + + client.Client + Log logr.Logger +} + +// MachineDeploymentUpgradeScope is a struct that holds the context of the upgrade process. +type MachineDeploymentUpgradeScope struct { + MachineDeployment *clusterv1.MachineDeployment + PatchHelper *patch.Helper + UpgradeTo string + OwnedMachines []*clusterv1.Machine +} + +// NewMachineDeploymentReconciler creates a new MachineDeploymentReconciler. +func (r *MachineDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { + r.scheme = mgr.GetScheme() + r.recorder = mgr.GetEventRecorderFor("ck8s-machine-deployment-controller") + + if r.machineGetter == nil { + r.machineGetter = &ck8s.Management{ + Client: r.Client, + } + } + + // NOTE(Hue): Initially, I tried to go with comprehensive predicates but there was two problems with that: + // 1. It was not really understandable and mantainable. + // 2. Sometimes the reconciliation was not getting triggered when it should have, debugging this + // through the predicates was a nightmare. + if err := ctrl.NewControllerManagedBy(mgr). + For(&clusterv1.MachineDeployment{}). + Owns(&clusterv1.Machine{}). + Complete(r); err != nil { + return fmt.Errorf("failed to get new controller builder: %w", err) + } + + return nil +} + +// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=clusters;clusters/status,verbs=get;list;watch +// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=machines;machines/status,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=machinesets;machinesets/status,verbs=get;list;watch +// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=machinedeployments;machinedeployments/status,verbs=get;list;watch;create;update;patch;delete + +// Reconcile handles the reconciliation of a MachineDeployment object. +func (r *MachineDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + traceID := trace.NewID() + log := r.Log.WithValues("machine_deployment", req.NamespacedName, "trace_id", traceID) + log.V(1).Info("Reconciliation started...") + + machineDeployment := &clusterv1.MachineDeployment{} + if err := r.Get(ctx, req.NamespacedName, machineDeployment); err != nil { + if apierrors.IsNotFound(err) { + log.V(1).Info("MachineDeployment resource not found. Ignoring since object must be deleted.") + return ctrl.Result{}, nil + } + return ctrl.Result{}, fmt.Errorf("failed to get MachineDeployment: %w", err) + } + + if r.getUpgradeInstructions(machineDeployment) == "" { + log.V(1).Info("MachineDeployment has no upgrade instructions, skipping reconciliation") + return ctrl.Result{}, nil + } + + if !machineDeployment.DeletionTimestamp.IsZero() { + log.V(1).Info("MachineDeployment is being deleted, skipping reconciliation") + return ctrl.Result{}, nil + } + + ownedMachines, err := r.getOwnedMachines(ctx, machineDeployment) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to get owned machines: %w", err) + } + + scope, err := r.createScope(machineDeployment, ownedMachines) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to create scope: %w", err) + } + + // Starting the upgrade process + var upgradedMachines int + for _, m := range ownedMachines { + if r.isMachineUpgraded(scope, m) { + log.V(1).Info("Machine is already upgraded", "machine", m.Name) + upgradedMachines++ + continue + } + + if !m.DeletionTimestamp.IsZero() { + log.V(1).Info("Machine is being deleted, requeuing...", "machine", m.Name) + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil + } + + if r.isMachineUpgradeFailed(m) { + log.Info("Machine upgrade failed for machine, requeuing...", "machine", m.Name) + if err := r.markUpgradeFailed(ctx, scope, m); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to mark upgrade as failed: %w", err) + } + + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil + } + + if r.isMachineUpgrading(m) { + log.V(1).Info("Machine is upgrading, requeuing...", "machine", m.Name) + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil + } + + // Machine is not upgraded, mark it for upgrade + if err := r.markMachineToUpgrade(ctx, scope, m); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to mark machine to upgrade: %w", err) + } + + log.V(1).Info("Machine marked for upgrade", "machine", m.Name) + + if err := r.markUpgradeInProgress(ctx, scope, m); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to mark upgrade as in-progress: %w", err) + } + + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil + } + + if upgradedMachines == len(ownedMachines) { + if err := r.markUpgradeDone(ctx, scope); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to mark upgrade as done: %w", err) + } + + log.V(1).Info("All machines are upgraded") + return ctrl.Result{}, nil + } + + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil +} + +// markUpgradeInProgress marks the MachineDeployment as in-place upgrade in-progress. +func (r *MachineDeploymentReconciler) markUpgradeInProgress(ctx context.Context, scope *MachineDeploymentUpgradeScope, upgradingMachine *clusterv1.Machine) error { + mdAnnotations := scope.MachineDeployment.Annotations + if mdAnnotations == nil { + mdAnnotations = make(map[string]string) + } + + // clean up + delete(mdAnnotations, bootstrapv1.InPlaceUpgradeReleaseAnnotation) + + mdAnnotations[bootstrapv1.InPlaceUpgradeStatusAnnotation] = bootstrapv1.InPlaceUpgradeInProgressStatus + mdAnnotations[bootstrapv1.InPlaceUpgradeToAnnotation] = scope.UpgradeTo + + scope.MachineDeployment.SetAnnotations(mdAnnotations) + + if err := scope.PatchHelper.Patch(ctx, scope.MachineDeployment); err != nil { + return fmt.Errorf("failed to patch: %w", err) + } + + r.recorder.Eventf( + scope.MachineDeployment, + corev1.EventTypeNormal, + bootstrapv1.InPlaceUpgradeInProgressEvent, + "In-place upgrade is in-progress for %q", + upgradingMachine.Name, + ) + return nil +} + +// markUpgradeDone marks the MachineDeployment as in-place upgrade done. +func (r *MachineDeploymentReconciler) markUpgradeDone(ctx context.Context, scope *MachineDeploymentUpgradeScope) error { + annotations := scope.MachineDeployment.Annotations + if annotations == nil { + annotations = make(map[string]string) + } + + // clean up + delete(annotations, bootstrapv1.InPlaceUpgradeToAnnotation) + + annotations[bootstrapv1.InPlaceUpgradeStatusAnnotation] = bootstrapv1.InPlaceUpgradeDoneStatus + annotations[bootstrapv1.InPlaceUpgradeReleaseAnnotation] = scope.UpgradeTo + + scope.MachineDeployment.SetAnnotations(annotations) + + if err := scope.PatchHelper.Patch(ctx, scope.MachineDeployment); err != nil { + return fmt.Errorf("failed to patch: %w", err) + } + + r.recorder.Eventf( + scope.MachineDeployment, + corev1.EventTypeNormal, + bootstrapv1.InPlaceUpgradeDoneEvent, + "In-place upgrade is done", + ) + return nil +} + +// markUpgradeFailed marks the MachineDeployment as in-place upgrade failed. +func (r *MachineDeploymentReconciler) markUpgradeFailed(ctx context.Context, scope *MachineDeploymentUpgradeScope, failedM *clusterv1.Machine) error { + annotations := scope.MachineDeployment.Annotations + if annotations == nil { + annotations = make(map[string]string) + } + + // clean up + delete(annotations, bootstrapv1.InPlaceUpgradeReleaseAnnotation) + + annotations[bootstrapv1.InPlaceUpgradeStatusAnnotation] = bootstrapv1.InPlaceUpgradeFailedStatus + scope.MachineDeployment.SetAnnotations(annotations) + + if err := scope.PatchHelper.Patch(ctx, scope.MachineDeployment); err != nil { + return fmt.Errorf("failed to patch: %w", err) + } + + r.recorder.Eventf( + scope.MachineDeployment, + corev1.EventTypeWarning, + bootstrapv1.InPlaceUpgradeFailedEvent, + "In-place upgrade failed for machine %q.", + failedM.Name, + ) + return nil +} + +// createScope creates a new MachineDeploymentUpgradeScope. +func (r *MachineDeploymentReconciler) createScope(md *clusterv1.MachineDeployment, ownedMachines []*clusterv1.Machine) (*MachineDeploymentUpgradeScope, error) { + patchHelper, err := patch.NewHelper(md, r.Client) + if err != nil { + return nil, fmt.Errorf("failed to create new patch helper: %w", err) + } + + return &MachineDeploymentUpgradeScope{ + MachineDeployment: md, + UpgradeTo: r.getUpgradeInstructions(md), + OwnedMachines: ownedMachines, + PatchHelper: patchHelper, + }, nil +} + +// getCluster gets the Cluster object for the MachineDeployment. +func (r *MachineDeploymentReconciler) getCluster(ctx context.Context, md *clusterv1.MachineDeployment) (*clusterv1.Cluster, error) { + cluster := &clusterv1.Cluster{} + clusterKey := client.ObjectKey{ + Namespace: md.Namespace, + Name: md.Spec.ClusterName, + } + if err := r.Get(ctx, clusterKey, cluster); err != nil { + return nil, fmt.Errorf("failed to get object: %w", err) + } + + return cluster, nil +} + +// getOwnedMachines gets the machines owned by the MachineDeployment. +func (r *MachineDeploymentReconciler) getOwnedMachines(ctx context.Context, md *clusterv1.MachineDeployment) ([]*clusterv1.Machine, error) { + cluster, err := r.getCluster(ctx, md) + if err != nil { + return nil, fmt.Errorf("failed to get cluster: %w", err) + } + + // NOTE(Hue): The machines are not owned by the MachineDeployment directly, but by the MachineSet. + var ( + msList clusterv1.MachineSetList + selector = map[string]string{ + clusterv1.ClusterNameLabel: cluster.Name, + clusterv1.MachineDeploymentNameLabel: md.Name, + } + ) + if err := r.List(ctx, &msList, client.InNamespace(cluster.Namespace), client.MatchingLabels(selector)); err != nil { + return nil, fmt.Errorf("failed to get MachineSetList: %w", err) + } + + var ( + ms clusterv1.MachineSet + found bool + ) + // NOTE(Hue): The nosec is due to a false positive: https://stackoverflow.com/questions/62446118/implicit-memory-aliasing-in-for-loop + for _, _ms := range msList.Items { // #nosec G601 + if util.IsOwnedByObject(&_ms, md) { + ms = _ms + found = true + break + } + } + + if !found { + return nil, fmt.Errorf("failed to find MachineSet owned by MachineDeployment %q", md.Name) + } + + ownedMachinesCollection, err := r.machineGetter.GetMachinesForCluster(ctx, client.ObjectKeyFromObject(cluster), collections.OwnedMachines(&ms)) + if err != nil { + return nil, fmt.Errorf("failed to get cluster machines: %w", err) + } + + ownedMachines := make([]*clusterv1.Machine, len(ownedMachinesCollection)) + i := 0 + for _, m := range ownedMachinesCollection { + ownedMachines[i] = m + i++ + } + + // NOTE(Hue): Sorting machines by their UID to make sure we have a deterministic order. + // This is to (kind of) make sure we upgrade the machines in the same order every time. + // Meaning that if in the previous reconciliation we annotated a machine with upgrade-to, + // In the next reconciliation we will make sure that upgrade was successful before moving + // to the next machine. + // This is not the most robust way to do this, but it's good enough. + // A better way to do this might be to use some kind of lock (via a secret or something), + // similar to control plane init lock. + slices.SortStableFunc(ownedMachines, func(m1, m2 *clusterv1.Machine) int { + switch { + case m1.UID < m2.UID: + return -1 + case m1.UID == m2.UID: + return 0 + default: + return 1 + } + }) + + return ownedMachines, nil +} + +// isMachineUpgraded checks if the machine is already upgraded. +func (r *MachineDeploymentReconciler) isMachineUpgraded(scope *MachineDeploymentUpgradeScope, m *clusterv1.Machine) bool { + mUpgradeRelease := m.Annotations[bootstrapv1.InPlaceUpgradeReleaseAnnotation] + return mUpgradeRelease == scope.UpgradeTo +} + +// isMachineUpgrading checks if the machine is upgrading. +func (r *MachineDeploymentReconciler) isMachineUpgrading(m *clusterv1.Machine) bool { + return m.Annotations[bootstrapv1.InPlaceUpgradeStatusAnnotation] == bootstrapv1.InPlaceUpgradeInProgressStatus || + m.Annotations[bootstrapv1.InPlaceUpgradeToAnnotation] != "" +} + +// isMachineUpgradeFailed checks if the machine upgrade failed. +func (r *MachineDeploymentReconciler) isMachineUpgradeFailed(m *clusterv1.Machine) bool { + return m.Annotations[bootstrapv1.InPlaceUpgradeLastFailedAttemptAtAnnotation] != "" +} + +// markMachineToUpgrade marks the machine to upgrade. +func (r *MachineDeploymentReconciler) markMachineToUpgrade(ctx context.Context, scope *MachineDeploymentUpgradeScope, m *clusterv1.Machine) error { + patchHelper, err := patch.NewHelper(m, r.Client) + if err != nil { + return fmt.Errorf("failed to create new patch helper: %w", err) + } + + if m.Annotations == nil { + m.Annotations = make(map[string]string) + } + + // clean up + delete(m.Annotations, bootstrapv1.InPlaceUpgradeReleaseAnnotation) + delete(m.Annotations, bootstrapv1.InPlaceUpgradeStatusAnnotation) + delete(m.Annotations, bootstrapv1.InPlaceUpgradeChangeIDAnnotation) + delete(m.Annotations, bootstrapv1.InPlaceUpgradeLastFailedAttemptAtAnnotation) + + m.Annotations[bootstrapv1.InPlaceUpgradeToAnnotation] = scope.UpgradeTo + + if err := patchHelper.Patch(ctx, m); err != nil { + return fmt.Errorf("failed to patch: %w", err) + } + + r.recorder.Eventf( + scope.MachineDeployment, + corev1.EventTypeNormal, + bootstrapv1.InPlaceUpgradeInProgressEvent, + "Machine %q is upgrading to %q", + m.Name, + scope.UpgradeTo, + ) + + return nil +} + +func (r *MachineDeploymentReconciler) getUpgradeInstructions(md *clusterv1.MachineDeployment) string { + // NOTE(Hue): The reason we are checking the `release` annotation as well is that we want to make sure + // we upgrade the new machines that joined after the initial upgrade process. + // The `upgrade-to` overwrites the `release` annotation, because we might have both in case + // the user decides to do another in-place upgrade after a successful one. + upgradeTo := md.Annotations[bootstrapv1.InPlaceUpgradeReleaseAnnotation] + if to, ok := md.Annotations[bootstrapv1.InPlaceUpgradeToAnnotation]; ok { + upgradeTo = to + } + + return upgradeTo +} diff --git a/bootstrap/controllers/upgrade_controller.go b/bootstrap/controllers/upgrade_controller.go index 4daaa296..cc21d849 100644 --- a/bootstrap/controllers/upgrade_controller.go +++ b/bootstrap/controllers/upgrade_controller.go @@ -191,6 +191,9 @@ func (r *InPlaceUpgradeReconciler) markUpgradeFailed(ctx context.Context, scope mAnnotations := scope.Machine.GetAnnotations() mAnnotations[bootstrapv1.InPlaceUpgradeStatusAnnotation] = bootstrapv1.InPlaceUpgradeFailedStatus + // NOTE(Hue): Add an annotation here to indicate that the upgrade failed + // and we are not going to retry it. + mAnnotations[bootstrapv1.InPlaceUpgradeLastFailedAttemptAtAnnotation] = time.Now().Format(time.RFC1123Z) scope.Machine.SetAnnotations(mAnnotations) if err := scope.PatchHelper.Patch(ctx, scope.Machine); err != nil { return fmt.Errorf("failed to patch machine annotations: %w", err) @@ -275,6 +278,7 @@ func (r *InPlaceUpgradeReconciler) handleUpgradeDone(ctx context.Context, scope delete(mAnnotations, bootstrapv1.InPlaceUpgradeToAnnotation) delete(mAnnotations, bootstrapv1.InPlaceUpgradeChangeIDAnnotation) + delete(mAnnotations, bootstrapv1.InPlaceUpgradeLastFailedAttemptAtAnnotation) mAnnotations[bootstrapv1.InPlaceUpgradeReleaseAnnotation] = scope.UpgradeOption scope.Machine.SetAnnotations(mAnnotations) if err := scope.PatchHelper.Patch(ctx, scope.Machine); err != nil { @@ -287,6 +291,13 @@ func (r *InPlaceUpgradeReconciler) handleUpgradeDone(ctx context.Context, scope func (r *InPlaceUpgradeReconciler) handleUpgradeFailed(ctx context.Context, scope *UpgradeScope) (reconcile.Result, error) { mAnnotations := scope.Machine.GetAnnotations() + // NOTE(Hue): We don't remove the `LastFailedAttemptAt` annotation here + // because we want to know if the upgrade failed at some point in the `MachineDeploymentReconciler`. + // This function triggers a retry by removing the `Status` and `ChangeID` annotations, + // but the `LastFailedAttemptAt` lets us descriminiate between a retry and a fresh upgrade. + // Overall, we don't remove the `LastFailedAttemptAt` annotation in the `InPlaceUpgradeReconciler`. + // That's the responsibility of the `MachineDeploymentReconciler`. + delete(mAnnotations, bootstrapv1.InPlaceUpgradeStatusAnnotation) delete(mAnnotations, bootstrapv1.InPlaceUpgradeChangeIDAnnotation) scope.Machine.SetAnnotations(mAnnotations) diff --git a/bootstrap/main.go b/bootstrap/main.go index c288b208..974182cd 100644 --- a/bootstrap/main.go +++ b/bootstrap/main.go @@ -120,6 +120,14 @@ func main() { os.Exit(1) } + if err = (&controllers.MachineDeploymentReconciler{ + Client: mgr.GetClient(), + Log: ctrl.Log.WithName("controllers").WithName("MachineDeployment"), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "MachineDeployment") + os.Exit(1) + } + if os.Getenv("ENABLE_WEBHOOKS") != "false" { if err = (&bootstrapv1.CK8sConfig{}).SetupWebhookWithManager(mgr); err != nil { setupLog.Error(err, "unable to create webhook", "webhook", "CK8sConfig") diff --git a/controlplane/api/v1beta2/ck8scontrolplane_types.go b/controlplane/api/v1beta2/ck8scontrolplane_types.go index b0209858..074de64b 100644 --- a/controlplane/api/v1beta2/ck8scontrolplane_types.go +++ b/controlplane/api/v1beta2/ck8scontrolplane_types.go @@ -34,9 +34,6 @@ const ( // This annotation is used to detect any changes in ClusterConfiguration and trigger machine rollout in KCP. CK8sServerConfigurationAnnotation = "controlplane.cluster.x-k8s.io/ck8s-server-configuration" - // SkipCoreDNSAnnotation annotation explicitly skips reconciling CoreDNS if set. - SkipCoreDNSAnnotation = "controlplane.cluster.x-k8s.io/skip-coredns" - // RemediationInProgressAnnotation is used to keep track that a KCP remediation is in progress, and more // specifically it tracks that the system is in between having deleted an unhealthy machine and recreating its replacement. // NOTE: if something external to CAPI removes this annotation the system cannot detect the above situation; this can lead to diff --git a/controlplane/api/v1beta2/condition_consts.go b/controlplane/api/v1beta2/condition_consts.go index 65bbe02d..e25e1164 100644 --- a/controlplane/api/v1beta2/condition_consts.go +++ b/controlplane/api/v1beta2/condition_consts.go @@ -97,30 +97,6 @@ const ( PodInspectionFailedReason = "PodInspectionFailed" ) -const ( - // EtcdClusterHealthyCondition documents the overall etcd cluster's health. - EtcdClusterHealthyCondition clusterv1.ConditionType = "EtcdClusterHealthyCondition" - - // EtcdClusterInspectionFailedReason documents a failure in inspecting the etcd cluster status. - EtcdClusterInspectionFailedReason = "EtcdClusterInspectionFailed" - - // EtcdClusterUnknownReason reports an etcd cluster in unknown status. - EtcdClusterUnknownReason = "EtcdClusterUnknown" - - // EtcdClusterUnhealthyReason (Severity=Error) is set when the etcd cluster is unhealthy. - EtcdClusterUnhealthyReason = "EtcdClusterUnhealthy" - - // MachineEtcdMemberHealthyCondition report the machine's etcd member's health status. - // NOTE: This conditions exists only if a stacked etcd cluster is used. - MachineEtcdMemberHealthyCondition clusterv1.ConditionType = "EtcdMemberHealthy" - - // EtcdMemberInspectionFailedReason documents a failure in inspecting the etcd member status. - EtcdMemberInspectionFailedReason = "MemberInspectionFailed" - - // EtcdMemberUnhealthyReason (Severity=Error) documents a Machine's etcd member is unhealthy. - EtcdMemberUnhealthyReason = "EtcdMemberUnhealthy" -) - const ( // TokenAvailableCondition documents whether the token required for nodes to join the cluster is available. TokenAvailableCondition clusterv1.ConditionType = "TokenAvailable" diff --git a/controlplane/config/crd/bases/controlplane.cluster.x-k8s.io_ck8scontrolplanes.yaml b/controlplane/config/crd/bases/controlplane.cluster.x-k8s.io_ck8scontrolplanes.yaml index 98b95aa4..ba5ce55c 100644 --- a/controlplane/config/crd/bases/controlplane.cluster.x-k8s.io_ck8scontrolplanes.yaml +++ b/controlplane/config/crd/bases/controlplane.cluster.x-k8s.io_ck8scontrolplanes.yaml @@ -407,6 +407,21 @@ spec: items: type: string type: array + snapstoreProxyDomain: + description: The snap store proxy domain + type: string + snapstoreProxyId: + description: The snap store proxy ID + type: string + snapstoreProxyScheme: + default: http + description: |- + The snap store proxy domain's scheme, e.g. "http" or "https" without "://" + Defaults to "http". + enum: + - http + - https + type: string version: description: Version specifies the Kubernetes version. type: string diff --git a/controlplane/config/crd/bases/controlplane.cluster.x-k8s.io_ck8scontrolplanetemplates.yaml b/controlplane/config/crd/bases/controlplane.cluster.x-k8s.io_ck8scontrolplanetemplates.yaml index 37f1ea2b..b8c41013 100644 --- a/controlplane/config/crd/bases/controlplane.cluster.x-k8s.io_ck8scontrolplanetemplates.yaml +++ b/controlplane/config/crd/bases/controlplane.cluster.x-k8s.io_ck8scontrolplanetemplates.yaml @@ -384,6 +384,21 @@ spec: items: type: string type: array + snapstoreProxyDomain: + description: The snap store proxy domain + type: string + snapstoreProxyId: + description: The snap store proxy ID + type: string + snapstoreProxyScheme: + default: http + description: |- + The snap store proxy domain's scheme, e.g. "http" or "https" without "://" + Defaults to "http". + enum: + - http + - https + type: string version: description: Version specifies the Kubernetes version. type: string diff --git a/controlplane/controllers/ck8scontrolplane_controller.go b/controlplane/controllers/ck8scontrolplane_controller.go index 67ffe565..3da984b8 100644 --- a/controlplane/controllers/ck8scontrolplane_controller.go +++ b/controlplane/controllers/ck8scontrolplane_controller.go @@ -18,12 +18,12 @@ package controllers import ( "context" + "errors" "fmt" "strings" "time" "github.com/go-logr/logr" - "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -196,7 +196,7 @@ func (r *CK8sControlPlaneReconciler) reconcileDelete(ctx context.Context, cluste return reconcile.Result{}, err } - // Updates conditions reporting the status of static pods and the status of the etcd cluster. + // Updates conditions reporting the status of static pods // NOTE: Ignoring failures given that we are deleting if err := r.reconcileControlPlaneConditions(ctx, controlPlane); err != nil { logger.Info("failed to reconcile conditions", "error", err.Error()) @@ -497,22 +497,20 @@ func (r *CK8sControlPlaneReconciler) reconcile(ctx context.Context, cluster *clu return reconcile.Result{}, err } + if err := r.syncMachines(ctx, kcp, controlPlane); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to sync Machines: %w", err) + } + // Aggregate the operational state of all the machines; while aggregating we are adding the // source ref (reason@machine/name) so the problem can be easily tracked down to its source machine. conditions.SetAggregate(controlPlane.KCP, controlplanev1.MachinesReadyCondition, ownedMachines.ConditionGetters(), conditions.AddSourceRef(), conditions.WithStepCounterIf(false)) - // Updates conditions reporting the status of static pods and the status of the etcd cluster. + // Updates conditions reporting the status of static pods // NOTE: Conditions reporting KCP operation progress like e.g. Resized or SpecUpToDate are inlined with the rest of the execution. if err := r.reconcileControlPlaneConditions(ctx, controlPlane); err != nil { return reconcile.Result{}, err } - // Ensures the number of etcd members is in sync with the number of machines/nodes. - // NOTE: This is usually required after a machine deletion. - if err := r.reconcileEtcdMembers(ctx, controlPlane); err != nil { - return reconcile.Result{}, err - } - // Reconcile unhealthy machines by triggering deletion and requeue if it is considered safe to remediate, // otherwise continue with the other KCP operations. if result, err := r.reconcileUnhealthyMachines(ctx, controlPlane); err != nil || !result.IsZero() { @@ -558,26 +556,6 @@ func (r *CK8sControlPlaneReconciler) reconcile(ctx context.Context, cluster *clu return r.scaleDownControlPlane(ctx, cluster, kcp, controlPlane, collections.Machines{}) } - // Get the workload cluster client. - /** - workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster)) - if err != nil { - logger.V(2).Info("cannot get remote client to workload cluster, will requeue", "cause", err) - return ctrl.Result{Requeue: true}, nil - } - - // Update kube-proxy daemonset. - if err := workloadCluster.UpdateKubeProxyImageInfo(ctx, kcp); err != nil { - logger.Error(err, "failed to update kube-proxy daemonset") - return reconcile.Result{}, err - } - - // Update CoreDNS deployment. - if err := workloadCluster.UpdateCoreDNS(ctx, kcp); err != nil { - return reconcile.Result{}, fmt.Errorf("failed to update CoreDNS deployment") - } - **/ - return reconcile.Result{}, nil } @@ -657,8 +635,7 @@ func (r *CK8sControlPlaneReconciler) reconcileKubeconfig(ctx context.Context, cl return reconcile.Result{}, nil } -// reconcileControlPlaneConditions is responsible of reconciling conditions reporting the status of static pods and -// the status of the etcd cluster. +// reconcileControlPlaneConditions is responsible of reconciling conditions reporting the status of static pods. func (r *CK8sControlPlaneReconciler) reconcileControlPlaneConditions(ctx context.Context, controlPlane *ck8s.ControlPlane) error { // If the cluster is not yet initialized, there is no way to connect to the workload cluster and fetch information // for updating conditions. Return early. @@ -674,7 +651,6 @@ func (r *CK8sControlPlaneReconciler) reconcileControlPlaneConditions(ctx context // Update conditions status workloadCluster.UpdateAgentConditions(ctx, controlPlane) - workloadCluster.UpdateEtcdConditions(ctx, controlPlane) // Patch machines with the updated conditions. if err := controlPlane.PatchMachines(ctx); err != nil { @@ -685,63 +661,36 @@ func (r *CK8sControlPlaneReconciler) reconcileControlPlaneConditions(ctx context return nil } -// reconcileEtcdMembers ensures the number of etcd members is in sync with the number of machines/nodes. -// This is usually required after a machine deletion. -// -// NOTE: this func uses KCP conditions, it is required to call reconcileControlPlaneConditions before this. -func (r *CK8sControlPlaneReconciler) reconcileEtcdMembers(ctx context.Context, controlPlane *ck8s.ControlPlane) error { - // NOTE(neoaggelos): Upstream uses this to reach the etcd cluster and remove any members that have not yet - // been removed, typically after a machine has been deleted. In the case of k8s-dqlite, this is handled automatically - // for us, so we do not need to do anything here. - // - // We still leave this code around in case we need to do work in the future (e.g. make sure any removed nodes do not - // still appear on microcluster or k8s-dqlite). - - /** - log := ctrl.LoggerFrom(ctx) - // If k8s-dqlite is not managed by KCP this is a no-op. - if !controlPlane.IsEtcdManaged() { - return nil - } - - // If there is no KCP-owned control-plane machines, then control-plane has not been initialized yet. - if controlPlane.Machines.Len() == 0 { - return nil - } +func (r *CK8sControlPlaneReconciler) syncMachines(ctx context.Context, kcp *controlplanev1.CK8sControlPlane, controlPlane *ck8s.ControlPlane) error { + for machineName := range controlPlane.Machines { + m := controlPlane.Machines[machineName] + // If the machine is already being deleted, we don't need to update it. + if !m.DeletionTimestamp.IsZero() { + continue + } - // Collect all the node names. - nodeNames := []string{} - for _, machine := range controlPlane.Machines { - if machine.Status.NodeRef == nil { - // If there are provisioning machines (machines without a node yet), return. - return nil + patchHelper, err := patch.NewHelper(m, r.Client) + if err != nil { + return fmt.Errorf("failed to create patch helper for machine: %w", err) } - nodeNames = append(nodeNames, machine.Status.NodeRef.Name) - } - // Potential inconsistencies between the list of members and the list of machines/nodes are - // surfaced using the EtcdClusterHealthyCondition; if this condition is true, meaning no inconsistencies exists, return early. - if conditions.IsTrue(controlPlane.KCP, controlplanev1.EtcdClusterHealthyCondition) { - return nil - } + // Create a new map if machine has no annotations. + if m.Annotations == nil { + m.Annotations = map[string]string{} + } - workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(controlPlane.Cluster)) - if err != nil { - // Failing at connecting to the workload cluster can mean workload cluster is unhealthy for a variety of reasons such as etcd quorum loss. - return errors.Wrap(err, "cannot get remote client to workload cluster") - } + // Set annotations + // Add the annotations from the MachineTemplate. + for k, v := range kcp.Spec.MachineTemplate.ObjectMeta.Annotations { + m.Annotations[k] = v + } - removedMembers, err := workloadCluster.ReconcileEtcdMembers(ctx, nodeNames) - if err != nil { - return errors.Wrap(err, "failed attempt to reconcile etcd members") - } + if err := patchHelper.Patch(ctx, m); err != nil { + return fmt.Errorf("failed to patch machine annotations: %w", err) + } - if len(removedMembers) > 0 { - log.Info("Etcd members without nodes removed from the cluster", "members", removedMembers) + controlPlane.Machines[machineName] = m } - - **/ - return nil } diff --git a/controlplane/controllers/machine_controller.go b/controlplane/controllers/machine_controller.go index 20659c9d..4e2fbf21 100644 --- a/controlplane/controllers/machine_controller.go +++ b/controlplane/controllers/machine_controller.go @@ -2,10 +2,10 @@ package controllers import ( "context" + "fmt" "time" "github.com/go-logr/logr" - "github.com/pkg/errors" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" @@ -34,15 +34,10 @@ func (r *MachineReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag For(&clusterv1.Machine{}). Build(r) - // NOTE(neoaggelos): See note below if r.managementCluster == nil { r.managementCluster = &ck8s.Management{ Client: r.Client, K8sdDialTimeout: r.K8sdDialTimeout, - /* - EtcdDialTimeout: r.EtcdDialTimeout, - EtcdCallTimeout: r.EtcdCallTimeout, - */ } } @@ -51,6 +46,7 @@ func (r *MachineReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag // +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=clusters;clusters/status,verbs=get;list;watch // +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=machines;machines/status,verbs=get;list;watch;create;update;patch;delete + func (r *MachineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := r.Log.WithValues("namespace", req.Namespace, "machine", req.Name) @@ -95,42 +91,16 @@ func (r *MachineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct // Note that this currently makes the annotation a no-op in the code here. However, we still keep the logic in the code is case it // is needed in the future. - /** - cluster, err := util.GetClusterFromMetadata(ctx, r.Client, m.ObjectMeta) - if err != nil { - logger.Info("unable to get cluster.") - return ctrl.Result{}, errors.Wrapf(err, "unable to get cluster") - } - workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster)) - if err != nil { - logger.Error(err, "failed to create client to workload cluster") - return ctrl.Result{}, errors.Wrapf(err, "failed to create client to workload cluster") - } - - etcdRemoved, err := workloadCluster.RemoveEtcdMemberForMachine(ctx, m) - if err != nil { - logger.Error(err, "failed to remove etcd member for machine") - return ctrl.Result{}, err - } - if !etcdRemoved { - logger.Info("wait embedded etcd controller to remove etcd") - return ctrl.Result{Requeue: true}, err - } - - // It is possible that the machine has no machine ref yet, will record the machine name in log - logger.Info("etcd remove etcd member succeeded", "machine name", m.Name) - **/ - patchHelper, err := patch.NewHelper(m, r.Client) if err != nil { - return ctrl.Result{}, errors.Wrapf(err, "failed to create patch helper for machine") + return ctrl.Result{}, fmt.Errorf("failed to create patch helper for machine: %w", err) } mAnnotations := m.GetAnnotations() delete(mAnnotations, clusterv1.PreTerminateDeleteHookAnnotationPrefix) m.SetAnnotations(mAnnotations) if err := patchHelper.Patch(ctx, m); err != nil { - return ctrl.Result{}, errors.Wrapf(err, "failed patch machine") + return ctrl.Result{}, fmt.Errorf("failed to patch machine: %w", err) } } diff --git a/controlplane/controllers/remediation.go b/controlplane/controllers/remediation.go index 2baa7c9b..8139a16d 100644 --- a/controlplane/controllers/remediation.go +++ b/controlplane/controllers/remediation.go @@ -23,7 +23,6 @@ import ( "time" "github.com/go-logr/logr" - "github.com/pkg/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/klog/v2" @@ -56,7 +55,7 @@ func (r *CK8sControlPlaneReconciler) reconcileUnhealthyMachines(ctx context.Cont m.DeletionTimestamp.IsZero() { patchHelper, err := patch.NewHelper(m, r.Client) if err != nil { - errList = append(errList, errors.Wrapf(err, "failed to get PatchHelper for machine %s", m.Name)) + errList = append(errList, fmt.Errorf("failed to get PatchHelper for machine %s: %w", m.Name, err)) continue } @@ -65,7 +64,7 @@ func (r *CK8sControlPlaneReconciler) reconcileUnhealthyMachines(ctx context.Cont if err := patchHelper.Patch(ctx, m, patch.WithOwnedConditions{Conditions: []clusterv1.ConditionType{ clusterv1.MachineOwnerRemediatedCondition, }}); err != nil { - errList = append(errList, errors.Wrapf(err, "failed to patch machine %s", m.Name)) + errList = append(errList, fmt.Errorf("failed to patch machine %s: %w", m.Name, err)) } } } @@ -117,7 +116,7 @@ func (r *CK8sControlPlaneReconciler) reconcileUnhealthyMachines(ctx context.Cont }}); err != nil { log.Error(err, "Failed to patch control plane Machine", "Machine", machineToBeRemediated.Name) if retErr == nil { - retErr = errors.Wrapf(err, "failed to patch control plane Machine %s", machineToBeRemediated.Name) + retErr = fmt.Errorf("failed to patch control plane Machine %s: %w", machineToBeRemediated.Name, err) } } }() @@ -167,74 +166,7 @@ func (r *CK8sControlPlaneReconciler) reconcileUnhealthyMachines(ctx context.Cont // so that the cluster does not lock and cause the cluster to go down. In the case of k8s-dqlite, this is automatically handled by the // go-dqlite layer, and Canonical Kubernetes has logic to automatically keep a quorum of nodes in normal operation. // - // Therefore, we currently disable this check for simplicity, but should remember that we need this precondition before proceeing. - - /** - // Remediation MUST preserve etcd quorum. This rule ensures that KCP will not remove a member that would result in etcd - // losing a majority of members and thus become unable to field new requests. - if controlPlane.IsEtcdManaged() { - canSafelyRemediate, err := r.canSafelyRemoveEtcdMember(ctx, controlPlane, machineToBeRemediated) - if err != nil { - conditions.MarkFalse(machineToBeRemediated, clusterv1.MachineOwnerRemediatedCondition, clusterv1.RemediationFailedReason, clusterv1.ConditionSeverityError, err.Error()) - return ctrl.Result{}, err - } - if !canSafelyRemediate { - log.Info("A control plane machine needs remediation, but removing this machine could result in etcd quorum loss. Skipping remediation") - conditions.MarkFalse(machineToBeRemediated, clusterv1.MachineOwnerRemediatedCondition, clusterv1.WaitingForRemediationReason, clusterv1.ConditionSeverityWarning, "KCP can't remediate this machine because this could result in etcd loosing quorum") - return ctrl.Result{}, nil - } - } - **/ - - // Start remediating the unhealthy control plane machine by deleting it. - // A new machine will come up completing the operation as part of the regular reconcile. - - // NOTE(neoaggelos): Here, upstream will check whether the node that is about to be removed is the leader of the etcd cluster, and will - // attempt to forward the leadership to a different active node before proceeding. This is so that continuous operation of the cluster - // is preserved. - // - // TODO(neoaggelos): For Canonical Kubernetes, we should instead use the RemoveNode endpoint of the k8sd service from a different control - // plane node (through the k8sd-proxy), which will handle this operation for us. If that fails, we must not proceed. - - /** - // If the control plane is initialized, before deleting the machine: - // - if the machine hosts the etcd leader, forward etcd leadership to another machine. - // - delete the etcd member hosted on the machine being deleted. - workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(controlPlane.Cluster)) - if err != nil { - log.Error(err, "Failed to create client to workload cluster") - return ctrl.Result{}, errors.Wrapf(err, "failed to create client to workload cluster") - } - - // If the machine that is about to be deleted is the etcd leader, move it to the newest member available. - if controlPlane.IsEtcdManaged() { - etcdLeaderCandidate := controlPlane.HealthyMachines().Newest() - if etcdLeaderCandidate == nil { - log.Info("A control plane machine needs remediation, but there is no healthy machine to forward etcd leadership to") - conditions.MarkFalse(machineToBeRemediated, clusterv1.MachineOwnerRemediatedCondition, clusterv1.RemediationFailedReason, clusterv1.ConditionSeverityWarning, - "A control plane machine needs remediation, but there is no healthy machine to forward etcd leadership to. Skipping remediation") - return ctrl.Result{}, nil - } - if err := workloadCluster.ForwardEtcdLeadership(ctx, machineToBeRemediated, etcdLeaderCandidate); err != nil { - log.Error(err, "Failed to move etcd leadership to candidate machine", "candidate", klog.KObj(etcdLeaderCandidate)) - conditions.MarkFalse(machineToBeRemediated, clusterv1.MachineOwnerRemediatedCondition, clusterv1.RemediationFailedReason, clusterv1.ConditionSeverityError, err.Error()) - return ctrl.Result{}, err - } - - patchHelper, err := patch.NewHelper(machineToBeRemediated, r.Client) - if err != nil { - return ctrl.Result{}, errors.Wrapf(err, "failed to create patch helper for machine") - } - - mAnnotations := machineToBeRemediated.GetAnnotations() - mAnnotations[clusterv1.PreTerminateDeleteHookAnnotationPrefix] = ck8sHookName - machineToBeRemediated.SetAnnotations(mAnnotations) - - if err := patchHelper.Patch(ctx, machineToBeRemediated); err != nil { - return ctrl.Result{}, errors.Wrapf(err, "failed patch machine for adding preTerminate hook") - } - } - **/ + // Therefore, we have removed this check for simplicity, but should remember that we need this precondition before proceeing. } microclusterPort := controlPlane.KCP.Spec.CK8sConfigSpec.ControlPlaneConfig.GetMicroclusterPort() @@ -242,7 +174,7 @@ func (r *CK8sControlPlaneReconciler) reconcileUnhealthyMachines(ctx context.Cont workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, clusterObjectKey, microclusterPort) if err != nil { log.Error(err, "failed to create client to workload cluster") - return ctrl.Result{}, errors.Wrapf(err, "failed to create client to workload cluster") + return ctrl.Result{}, fmt.Errorf("failed to create client to workload cluster: %w", err) } if err := workloadCluster.RemoveMachineFromCluster(ctx, machineToBeRemediated); err != nil { @@ -252,7 +184,7 @@ func (r *CK8sControlPlaneReconciler) reconcileUnhealthyMachines(ctx context.Cont // Delete the machine if err := r.Client.Delete(ctx, machineToBeRemediated); err != nil { conditions.MarkFalse(machineToBeRemediated, clusterv1.MachineOwnerRemediatedCondition, clusterv1.RemediationFailedReason, clusterv1.ConditionSeverityError, err.Error()) - return ctrl.Result{}, errors.Wrapf(err, "failed to delete unhealthy machine %s", machineToBeRemediated.Name) + return ctrl.Result{}, fmt.Errorf("failed to delete unhealthy machine %s: %w", machineToBeRemediated.Name, err) } // Surface the operation is in progress. @@ -381,110 +313,6 @@ func maxDuration(x, y time.Duration) time.Duration { return x } -// NOTE(neoaggelos): See note above. Implementation kept here for future reference, only remove once the NOTEs and TODOs in the reconcileUnhealthyMachines -// have been fully addressed and are well-tested. - -//nolint:godot -/** -// canSafelyRemoveEtcdMember assess if it is possible to remove the member hosted on the machine to be remediated -// without loosing etcd quorum. -// -// The answer mostly depend on the existence of other failing members on top of the one being deleted, and according -// to the etcd fault tolerance specification (see https://etcd.io/docs/v3.3/faq/#what-is-failure-tolerance): -// - 3 CP cluster does not tolerate additional failing members on top of the one being deleted (the target -// cluster size after deletion is 2, fault tolerance 0) -// - 5 CP cluster tolerates 1 additional failing members on top of the one being deleted (the target -// cluster size after deletion is 4, fault tolerance 1) -// - 7 CP cluster tolerates 2 additional failing members on top of the one being deleted (the target -// cluster size after deletion is 6, fault tolerance 2) -// - etc. -// -// NOTE: this func assumes the list of members in sync with the list of machines/nodes, it is required to call reconcileEtcdMembers -// as well as reconcileControlPlaneConditions before this. -// -// adapted from kubeadm controller and makes the assumption that the set of controplane nodes equals the set of etcd nodes. -func (r *CK8sControlPlaneReconciler) canSafelyRemoveEtcdMember(ctx context.Context, controlPlane *ck8s.ControlPlane, machineToBeRemediated *clusterv1.Machine) (bool, error) { - log := ctrl.LoggerFrom(ctx) - - workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(controlPlane.Cluster)) - if err != nil { - return false, errors.Wrapf(err, "failed to get client for workload cluster %s", controlPlane.Cluster.Name) - } - - // Gets the etcd status - - // This makes it possible to have a set of etcd members status different from the MHC unhealthy/unhealthy conditions. - etcdMembers, err := workloadCluster.EtcdMembers(ctx) - if err != nil { - return false, errors.Wrapf(err, "failed to get etcdStatus for workload cluster %s", controlPlane.Cluster.Name) - } - - currentTotalMembers := len(etcdMembers) - - log.Info("etcd cluster before remediation", - "currentTotalMembers", currentTotalMembers) - - // Projects the target etcd cluster after remediation, considering all the etcd members except the one being remediated. - targetTotalMembers := 0 - targetUnhealthyMembers := 0 - - healthyMembers := []string{} - unhealthyMembers := []string{} - for _, etcdMember := range etcdMembers { - // Skip the machine to be deleted because it won't be part of the target etcd cluster. - if machineToBeRemediated.Status.NodeRef != nil && machineToBeRemediated.Status.NodeRef.Name == etcdMember { - continue - } - - // Include the member in the target etcd cluster. - targetTotalMembers++ - - // Search for the machine corresponding to the etcd member. - var machine *clusterv1.Machine - for _, m := range controlPlane.Machines { - if m.Status.NodeRef != nil && m.Status.NodeRef.Name == etcdMember { - machine = m - break - } - } - - // If an etcd member does not have a corresponding machine it is not possible to retrieve etcd member health, - // so KCP is assuming the worst scenario and considering the member unhealthy. - // - // NOTE: This should not happen given that KCP is running reconcileEtcdMembers before calling this method. - if machine == nil { - log.Info("An etcd member does not have a corresponding machine, assuming this member is unhealthy", "MemberName", etcdMember) - targetUnhealthyMembers++ - unhealthyMembers = append(unhealthyMembers, fmt.Sprintf("%s (no machine)", etcdMember)) - continue - } - - // Check member health as reported by machine's health conditions - if !conditions.IsTrue(machine, controlplanev1.MachineEtcdMemberHealthyCondition) { - targetUnhealthyMembers++ - unhealthyMembers = append(unhealthyMembers, fmt.Sprintf("%s (%s)", etcdMember, machine.Name)) - continue - } - - healthyMembers = append(healthyMembers, fmt.Sprintf("%s (%s)", etcdMember, machine.Name)) - } - - // See https://etcd.io/docs/v3.3/faq/#what-is-failure-tolerance for fault tolerance formula explanation. - targetQuorum := (targetTotalMembers / 2.0) + 1 - canSafelyRemediate := targetTotalMembers-targetUnhealthyMembers >= targetQuorum - - log.Info(fmt.Sprintf("etcd cluster projected after remediation of %s", machineToBeRemediated.Name), - "healthyMembers", healthyMembers, - "unhealthyMembers", unhealthyMembers, - "targetTotalMembers", targetTotalMembers, - "targetQuorum", targetQuorum, - "targetUnhealthyMembers", targetUnhealthyMembers, - "canSafelyRemediate", canSafelyRemediate) - - return canSafelyRemediate, nil -} -**/ - // RemediationData struct is used to keep track of information stored in the RemediationInProgressAnnotation in KCP // during remediation and then into the RemediationForAnnotation on the replacement machine once it is created. type RemediationData struct { @@ -503,7 +331,7 @@ type RemediationData struct { func RemediationDataFromAnnotation(value string) (*RemediationData, error) { ret := &RemediationData{} if err := json.Unmarshal([]byte(value), ret); err != nil { - return nil, errors.Wrapf(err, "failed to unmarshal value %s for %s annotation", value, clusterv1.RemediationInProgressReason) + return nil, fmt.Errorf("failed to unmarshal value %s for %s annotation: %w", value, clusterv1.RemediationInProgressReason, err) } return ret, nil } @@ -512,7 +340,7 @@ func RemediationDataFromAnnotation(value string) (*RemediationData, error) { func (r *RemediationData) Marshal() (string, error) { b, err := json.Marshal(r) if err != nil { - return "", errors.Wrapf(err, "failed to marshal value for %s annotation", clusterv1.RemediationInProgressReason) + return "", fmt.Errorf("failed to marshal value for %s annotation: %w", clusterv1.RemediationInProgressReason, err) } return string(b), nil } diff --git a/controlplane/controllers/scale.go b/controlplane/controllers/scale.go index de0f3777..1b31f50d 100644 --- a/controlplane/controllers/scale.go +++ b/controlplane/controllers/scale.go @@ -19,10 +19,10 @@ package controllers import ( "context" "encoding/json" + "errors" "fmt" "strings" - "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -119,52 +119,12 @@ func (r *CK8sControlPlaneReconciler) scaleDownControlPlane( return ctrl.Result{}, fmt.Errorf("failed to pick control plane Machine to delete: %w", err) } - // NOTE(neoaggelos): Here, upstream will check whether the node that is about to be removed is the leader of the etcd cluster, and will - // attempt to forward the leadership to a different active node before proceeding. This is so that continuous operation of the cluster - // is preserved. - // - // TODO(neoaggelos): For Canonical Kubernetes, we should instead use the RemoveNode endpoint of the k8sd service from a different control - // plane node (through the k8sd-proxy), which will handle this operation for us. If that fails, we must not proceed. - // - // Finally, note that upstream only acts if the cluster has a managed etcd. For Canonical Kubernetes, we must always perform this action, - // since we must delete the node from microcluster as well. - - /** - // If KCP should manage etcd, If etcd leadership is on machine that is about to be deleted, move it to the newest member available. - if controlPlane.IsEtcdManaged() { - workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster)) - if err != nil { - logger.Error(err, "Failed to create client to workload cluster") - return ctrl.Result{}, fmt.Errorf("failed to create client to workload cluster: %w", err) - } - - etcdLeaderCandidate := controlPlane.Machines.Newest() - if err := workloadCluster.ForwardEtcdLeadership(ctx, machineToDelete, etcdLeaderCandidate); err != nil { - logger.Error(err, "Failed to move leadership to candidate machine", "candidate", etcdLeaderCandidate.Name) - return ctrl.Result{}, err - } - - patchHelper, err := patch.NewHelper(machineToDelete, r.Client) - if err != nil { - return ctrl.Result{}, errors.Wrapf(err, "failed to create patch helper for machine") - } - - mAnnotations := machineToDelete.GetAnnotations() - mAnnotations[clusterv1.PreTerminateDeleteHookAnnotationPrefix] = ck8sHookName - machineToDelete.SetAnnotations(mAnnotations) - - if err := patchHelper.Patch(ctx, machineToDelete); err != nil { - return ctrl.Result{}, errors.Wrapf(err, "failed patch machine for adding preTerminate hook") - } - } - **/ - microclusterPort := controlPlane.KCP.Spec.CK8sConfigSpec.ControlPlaneConfig.GetMicroclusterPort() clusterObjectKey := util.ObjectKey(cluster) workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, clusterObjectKey, microclusterPort) if err != nil { logger.Error(err, "failed to create client to workload cluster") - return ctrl.Result{}, errors.Wrapf(err, "failed to create client to workload cluster") + return ctrl.Result{}, fmt.Errorf("failed to create client to workload cluster: %w", err) } if err := workloadCluster.RemoveMachineFromCluster(ctx, machineToDelete); err != nil { @@ -208,11 +168,6 @@ func (r *CK8sControlPlaneReconciler) preflightChecks(_ context.Context, controlP // Check machine health conditions; if there are conditions with False or Unknown, then wait. allMachineHealthConditions := []clusterv1.ConditionType{controlplanev1.MachineAgentHealthyCondition} - if controlPlane.IsEtcdManaged() { - allMachineHealthConditions = append(allMachineHealthConditions, - controlplanev1.MachineEtcdMemberHealthyCondition, - ) - } machineErrors := []error{} diff --git a/docs/proposals/003-refresh-certs.md b/docs/proposals/003-refresh-certs.md new file mode 100644 index 00000000..02c0caf9 --- /dev/null +++ b/docs/proposals/003-refresh-certs.md @@ -0,0 +1,373 @@ + + +# Proposal information + + +- **Index**: 003 + + +- **Status**: **ACCEPTED** + + + +- **Name**: ClusterAPI Certificates Refresh + + +- **Owner**: Mateo Florido [@mateoflorido](https://github.com/mateoflorido) + + +# Proposal Details + +## Summary + + +The proposal aims to enhance Canonical Kubernetes Cluster API Providers by +enabling administrators to refresh or renew certificates on cluster nodes +without the need for a rolling upgrade. This feature is particularly beneficial +in resource-constrained environments, such as private or edge clouds, where +performing a full node replacement may not be feasible. + +## Rationale + + +Currently, Cluster API lacks a mechanism for refreshing certificates on cluster +nodes without triggering a full rolling update. For example, while the Kubeadm +provider offers the ability to renew certificates, it requires a rolling update +of the cluster nodes or manual intervention before the certificates expire. + +This proposal aims to address this gap by enabling certificate renewal on +cluster nodes without requiring a rolling update. By providing administrators +with the ability to refresh certificates independently of node upgrades, this +feature improves cluster operation, especially in environments with limited +resources, such as private or edge clouds. + +It will enhance the user experience by minimizing downtime, reducing the need +for additional resources, and simplifying certificate management. This is +particularly valuable for users who need to maintain continuous availability +or operate in environments where rolling updates are not practical due to +resource constraints. + + +## User facing changes + + +Administrators will be able to renew certificates on cluster nodes without +triggering a full rolling update. This can be achieved by annotating the Machine +object, which will initiate the certificate renewal process: + +``` +kubectl annotate machine v1beta2.k8sd.io/refresh-certificates={expires-in} +``` + +`expires-in` specifies how long the certificate will remain valid. It can be +expressed in years, months, days, additionally to other time units supported by +the `time.ParseDuration`. + +For tracking the validity of certificates, the Machine object will include a +`machine.cluster.x-k8s.io/certificates-expiry` annotation that indicates the +expiry date of the certificates. This annotation will be added when the cluster +is deployed and updated when certificates are renewed. The value of this +annotation will be a RFC3339 timestamp. + +## Alternative solutions + + +**Kubeadm Control Plane provider (KCP)** automates certificate rotations for +control plane machines by triggering a machine rollout when certificates are +close to expiration. + +### How to configure: +- In the KCP configuration, set the `rolloutBefore.certificatesExpiryDays` +field. This tells KCP when to trigger the rollout before certificates expire: + +```yaml +spec: + rolloutBefore: + certificatesExpiryDays: 21 # Trigger rollout when certificates expire within 21 days +``` + +### How it works: +- **Automatic Rollouts**: KCP monitors the certificate expiry dates of control +plane machines using the `Machine.Status.CertificatesExpiryDate`. If +certificates are about to expire (based on a configured threshold), KCP +triggers a machine rollout to refresh them. +- **Certificate Expiry Check**: The expiry date is sourced from the +`machine.cluster.x-k8s.io/certificates-expiry` annotation on the Machine or +Bootstrap Config object. + +For manual rotations, the administrator should run the `kubeadm certs renew` +command, ensure all control plane components are restarted, and remove the +expiry annotation for KCP to detect the updated certificate expiry date. + + +## Out of scope + + +This proposal does not cover the orchestration of certificate renewal for the +whole cluster. It focuses on renewing certificates on individual cluster nodes +via the Machine object. + +Rolling updates of the cluster nodes are out of scope. This proposal aims to +renew certificates without triggering a full rolling update of the cluster. + +External certificate authorities (CAs) are also out of scope. This proposal +focuses on renewing self-signed certificates generated by Canonical Kubernetes. + +# Implementation Details + +## API Changes + + +### `GET /k8sd/certificates-expiry` + +This endpoint will return the expiry date of the certificates on a specific +cluster node. The response will include the expiry date of the certificates +in RFC3339 format. The value will be sourced from the Kubernetes API server +certificate. + +```go +type CertificatesExpiryResponse struct { + // ExpiryDate is the expiry date of the certificates on the node. + ExpiryDate string `json:"expiry-date"` +} +``` + +### `POST /x/capi/request-certificates` + +This endpoint will create the necessary Certificate Signing Request (CSR) for +a worker node. The request will include the duration after which the +certificates will expire. + +```go +type RequestCertificatesRequest struct { + // ExpirationSeconds is the duration after which the certificates will expire. + ExpirationSeconds int `json:"expiration-seconds"` +} +``` + +### `POST /x/capi/refresh-certificates/plan` + +This endpoint returns the renewal plan for certificates on a specific node. The +response will include the seed used to generate the Certificate Signing Request +(CSR) and a list of CSRs that need to be approved (for worker nodes). + +This endpoint utilizes the same structures and endpoints as the +`POST /k8sd/refresh-certs/plan`. + +```go +type RefreshCertificatesPlanResponse struct { + // Seed should be passed by clients to the RefreshCertificatesRun RPC. + Seed int `json:"seed"` + // CertificateSigningRequests is a list of names of the CertificateSigningRequests that need to be signed externally (for worker nodes). + CertificateSigningRequests []string `json:"certificate-signing-requests"` +} +``` + +### `POST /x/capi/refresh-certificates/run` + +This endpoint will trigger the renewal of certificates on a specific node. +The request will include the duration after which the certificates will expire +and a list of additional Subject Alternative Names (SANs) to include in the +certificate. + +This endpoint is applicable to both control plane and worker nodes. For worker +nodes, the request will include the seed used to generate the CSR. This +endpoint uses the same structures and endpoints as the +`POST /k8sd/refresh-certs/run`. + +```go +type RefreshCertificatesRequest struct { + // Seed is the seed used to generate the CSR. + Seed string `json:"seed"` + // ExpirationSeconds is the duration after which the certificates will expire. + ExpirationSeconds int `json:"expiration-seconds"` + //ExtraSANs is a list of additional Subject Alternative Names to include in the certificate. + ExtraSANs []string `json:"extra-sans"` +} +``` + +### `POST /x/capi/approve-certificates` + +This endpoint will approve the renewal of certificates for a worker node and +will be run by a control plane node. The request will include the seed used to +generate the CSR. + +```go +type ApproveCertificatesRequest struct { + // Seed is the seed used to generate the CSR. + Seed string `json:"seed"` +} +``` + +## Bootstrap Provider Changes + + +A controller called `CertificatesController` will be added to the bootstrap +provider. This controller will watch for the `v1beta2.k8sd.io/refresh-certificates` +annotation on the Machine object and trigger the certificate renewal process +when the annotation is present. + +### Control Plane Nodes + +The controller would use the value of the +`v1beta2.k8sd.io/refresh-certificates`annotation to determine the duration +after which the certificates will expire. It will then call the +`POST /x/capi/refresh-certificates` endpoint to trigger the certificate +renewal process. + +The controller will share the status of the certificate renewal process by +adding events to the Machine object. The events will indicate the progress of +the renewal process following this pattern: + +- `RefreshCertsInProgress`: The certificate renewal process is in progress, the + event will include the `Refreshing certificates in progress` message. +- `RefreshCertsDone`: The certificate renewal process is complete, the event + will include the `Certificates have been refreshed` message. +- `RefreshCertsFailed`: The certificate renewal process has failed, the event + will include the `Certificates renewal failed: {reason}` message. + +After the certificate renewal process is complete, the controller will update +the `machine.cluster.x-k8s.io/certificates-expiry` annotation on the Machine +object with the new expiry date of the certificates. + +Finally, the controller will remove the `v1beta2.k8sd.io/refresh-certificates` +annotation from the Machine object to indicate that the certificate renewal +process is complete. + +### Worker Nodes + +The controller would use the value of the `k8sd.io/refresh-certificates` +annotation to determine the duration after which the certificates will expire. +It will then call the `POST /x/capi/request-certificates` endpoint to create +the Certificate Signing Request (CSR) for the worker node. + +Using the `k8sd` proxy, the controller can call the +`POST /x/capi/approve-certificates` endpoint with the seed generated in the +previous step to approve the CSRs for the worker node. + +The controller will share the status similar to the control plane nodes by +emitting events to the `Machine` object. The events will indicate the progress +of the renewal process following the same pattern as in the control plane +nodes. + +After the CSR approval process is complete, the worker node will call the +`POST /x/capi/refresh-certificates` endpoint to trigger the certificate renewal +process, using the seed generated to recover the certificates from the CSR +resources. + +After the certificate renewal process is complete, the controller will update +the `machine.cluster.x-k8s.io/certificates-expiry` annotation on the Machine +object with the new expiry date of the certificates. + +Finally, the controller will remove the `v1beta2.k8sd.io/refresh-certificates` +annotation +from the Machine object to indicate that the certificate renewal process is +complete. + +## ControlPlane Provider Changes + + +None + +## Configuration Changes + + +None + +## Documentation Changes + + +This implementation will require adding the following documentation: +- How-to guide for renewing certificates on cluster nodes +- Reference page of the `v1beta2.k8sd.io/refresh-certificates` annotation + +## Testing + + +Integration tests will be added to the current test suite. The tests will +create a cluster, annotate the Machine object with the +`v1beta2.k8sd.io/refresh-certificates` annotation, and verify that the +certificates are renewed in the target node. + +## Considerations for backwards compatibility + + +None + +## Implementation notes and guidelines + + +We can leverage the existing certificate renewal logic in the k8s-snap. +For worker nodes, we need to modify the exisiting code to avoid blocking +the request until the certificates have been approved and issued. Instead, +we can use a multiple step process. Generating the CSRs, approving them, and +then trigger the certificate renewal process. + diff --git a/docs/proposals/004-orchestration-refresh-certs.md b/docs/proposals/004-orchestration-refresh-certs.md new file mode 100644 index 00000000..4ccb36f5 --- /dev/null +++ b/docs/proposals/004-orchestration-refresh-certs.md @@ -0,0 +1,266 @@ + + +# Proposal information + + +- **Index**: 004 + + +- **Status**: ACCEPTED + + + +- **Name**: Cluster Orchestration - Certificate Refresh + + +- **Owner**: Mateo Florido [@mateoflorido](https://github.com/mateoflorido) + +# Proposal Details + +## Summary + + +This proposal aims to introduce a mechanism to refresh certificates for all +nodes in a Canonical Kubernetes CAPI cluster simultaneously, removing the +need to annotate each machine individually. This feature will allow the +administrators to trigger a cluster-wide certificate refresh through +annotations on higher-level resources like `Cluster`, `CK8sControlPlane`, or +`MachineDeployment`. + +## Rationale + + +We currently have the ability to refresh the certificates for individual +`Machine` resources in the cluster. However, this process can be time consuming +as it requires annotating each `Machine` resource individually and waiting for +the certificates to refresh. In this proposal, we aim to introduce the +capability to refresh certificates for all nodes in the cluster at once. This +new feature will improve the user experience and speed up the process, +especially in large clusters. + +## User facing changes + + +Administrators can annotate the `Cluster`, `CK8sControlPlane` or +`MachineDeployment` objects to trigger the certificate refresh for machines in the +cluster, control plane nodes, or worker nodes, respectively. + +```yaml +kubectl annotate cluster v1beta2.k8sd.io/refresh-certificates={expires-in} +kubectl annotate ck8scontrolplane v1beta2.k8sd.io/refresh-certificates={expires-in} +kubectl annotate machinedeployment v1beta2.k8sd.io/refresh-certificates={expires-in} +``` + +`expires-in` specifies how long the certificates will be valid. It can be +expressed in years, months, days, or other time units supported by the +`time.ParseDuration` function. + +## Alternative solutions + + +As mentioned in the [Proposal 003], the Kubeadm Control Plane Provider can +refresh the certificates for the nodes in the cluster. However, this approach +requires performing a rolling update of the machines owned by the cluster. + +## Out of scope + + +This proposal does not include the functionality to refresh certificates via +a rolling update of nodes or automatically trigger the process when +certificates are close to expiring. Aditionally, it does not cover +the renewing of external certificates provided by the user or CA certificates. + +# Implementation Details + +## API Changes + + +None + +## Bootstrap Provider Changes + + +### Cluster Controller + +We will add a new controller, `ClusterCertificatesReconciler`, to the bootstrap +provider. This controller will monitor for `Cluster` objects and trigger a +certificate refresh for all nodes in the cluster when the +`v1beta2.k8sd.io/refresh-certificates` annotation is applied. + +The status of the certificate refresh process will be shared via the `Cluster` +object by emitting events. The events that the controller can emit are: +- `RefreshCertsInProgress`: The certificate refresh process has started. +- `RefreshCertsDone`: The certificate refresh process has finished successfully. +- `RefreshCertsFailed`: The certificate refresh process has failed. + +The controller should perform the following steps: +1. Retrieve the `CK8sControlPlane` object owned by the `Cluster` object. +2. Emit the `RefreshCertsInProgress` event for the `Cluster` object. +3. Trigger a certificate refresh for the control plane nodes by annotating the + `CK8sControlPlane` object with the `v1beta2.k8sd.io/refresh-certificates` + annotation. +4. Wait for the certificates to be refreshed on the control plane nodes. The + controller should check the `v1beta2.k8sd.io/refresh-certificates-status` + annotation to determine when the certificates have been refreshed. +5. If the refresh is successful, the controller proceeds to the + `MachineDeployment` objects. +6. For each `MachineDeployment` object, trigger a certificate refresh for the + worker nodes by annotating the `MachineDeployment` object with the + `v1beta2.k8sd.io/refresh-certificates` annotation. +7. Wait for the certificates to be refreshed on the worker nodes, checking the + `v1beta2.k8sd.io/refresh-certificates-status` annotation. +8. If the refresh is successful, the controller emits the `RefreshCertsDone` + event for the `Cluster` object. + +### MachineDeployment Controller + +We also need to add a new controller, `MachineDeployCertificatesReconciler`, to +the bootstrap provider. This controller will watch for `MachineDeployment` +objects and trigger a certificate refresh for all the worker nodes in the +cluster when the `v1beta2.k8sd.io/refresh-certificates` annotation is present. + +The controller should perform the following steps: +1. List all machines owned by the `MachineDeployment` object and filter out the + control plane machines. +2. Emit the `RefreshCertsInProgress` event for the `MachineDeployment` object. +3. For each machine, trigger the certificate refresh by annotating the machine + with the `v1beta2.k8sd.io/refresh-certificates` annotation. +4. Wait for the certificates to be refreshed on that machine. The controller + should check the `v1beta2.k8sd.io/refresh-certificates-status` annotation + to know when the certificates have been refreshed. +5. If the refresh is successful, the controller moves to the next machine. + +The status of the certificate refresh process will be shared via the +`MachineDeployment` object in the same way as the `Cluster` controller. + +## ControlPlane Provider Changes + + +A controller `ControlPlaneCertificatesReconciler` will be added to the control plane +provider. This controller will watch for the `CK8sControlPlane` objects and +will trigger the certificate refresh for all the control plane nodes in the +cluster when the `v1beta2.k8sd.io/refresh-certificates` annotation is present. + +The controller should perform the following steps: +1. List all the control plane machines owned by the `CK8sControlPlane` object. +2. Emit the `RefreshCertsInProgress` event for the `CK8sControlPlane` object. +3. For each control plane machine, trigger the certificate by annotating the + machine with the `v1beta2.k8sd.io/refresh-certificates` annotation. +4. Wait for the certificates to be refreshed in that machine. The controller + should check the `v1beta2.k8sd.io/refresh-certificates-status` + annotation to know when the certificates have been refreshed. +5. If the upgrade is sucessful, the controller moves to the next machine. + If the upgrade fails, the controller emits the `RefreshCertsFailed` event + for the `CK8sControlPlane` object and stops the process. +6. Once all the control plane machines have been refreshed, the controller emits + the `RefreshCertsDone` event for the `CK8sControlPlane` object. + +As mentioned in the Bootstrap Provider Changes, the status of the certificate +refresh process will be shared via the `CK8sControlPlane` object. Using the +same events as the `Cluster` controller. + +## Configuration Changes + + +None + +## Documentation Changes + +This proposal will require a new section in the Canonical Kubernetes +documentation explaining: +- How-to page on how refresh the certificates for a cluster. +- Explanation page on how the refreshing orchestration process works. + +## Testing + + +- Unit tests for the controllers in the bootstrap and control plane providers. +- Integration tests which cover the refreshing process for the certificates in + the cluster using the `v1beta2.k8sd.io/refresh-certificates` annotation in + the `Cluster`, `CK8sControlPlane` and `MachineDeployment` objects. + +## Considerations for backwards compatibility + + +None + +## Implementation notes and guidelines + +For this implementation, we can take as a reference the implementation of the +`CertificateController` in our repository. This controller is responsible for +refreshing the certificates for the Machines in the cluster. We are going to +leverage the logic in there to offload the certificate refresh process to this +controller. + + + +[Proposal 003]: 003-refresh-certs.md + diff --git a/go.mod b/go.mod index e09f131e..ad40c496 100644 --- a/go.mod +++ b/go.mod @@ -3,13 +3,12 @@ module github.com/canonical/cluster-api-k8s go 1.22.6 require ( - github.com/canonical/k8s-snap-api v1.0.8 + github.com/canonical/k8s-snap-api v1.0.9 github.com/go-logr/logr v1.4.1 github.com/google/uuid v1.4.0 github.com/onsi/ginkgo v1.16.5 github.com/onsi/ginkgo/v2 v2.17.1 github.com/onsi/gomega v1.32.0 - github.com/pkg/errors v0.9.1 google.golang.org/protobuf v1.33.0 gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.29.3 @@ -92,6 +91,7 @@ require ( github.com/opencontainers/image-spec v1.0.2 // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml/v2 v2.1.0 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 // indirect github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.45.0 // indirect diff --git a/go.sum b/go.sum index 9b2643a8..1c6433e8 100644 --- a/go.sum +++ b/go.sum @@ -29,8 +29,8 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= github.com/bwesterb/go-ristretto v1.2.0/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0= -github.com/canonical/k8s-snap-api v1.0.8 h1:W360Y4ulkAdCdQqbfQ7zXs3/Ty8SWENO3/Bzz8ZAEPE= -github.com/canonical/k8s-snap-api v1.0.8/go.mod h1:LDPoIYCeYnfgOFrwVPJ/4edGU264w7BB7g0GsVi36AY= +github.com/canonical/k8s-snap-api v1.0.9 h1:WhbyVtnR0GIAdY1UYBIzkspfgodxrHjlpT9FbG4NIu4= +github.com/canonical/k8s-snap-api v1.0.9/go.mod h1:LDPoIYCeYnfgOFrwVPJ/4edGU264w7BB7g0GsVi36AY= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= diff --git a/pkg/ck8s/control_plane.go b/pkg/ck8s/control_plane.go index 0861d959..a9080001 100644 --- a/pkg/ck8s/control_plane.go +++ b/pkg/ck8s/control_plane.go @@ -340,7 +340,6 @@ func (c *ControlPlane) PatchMachines(ctx context.Context) error { if helper, ok := c.machinesPatchHelpers[machine.Name]; ok { if err := helper.Patch(ctx, machine, patch.WithOwnedConditions{Conditions: []clusterv1.ConditionType{ controlplanev1.MachineAgentHealthyCondition, - controlplanev1.MachineEtcdMemberHealthyCondition, }}); err != nil { errList = append(errList, fmt.Errorf("failed to patch machine %s: %w", machine.Name, err)) } diff --git a/pkg/ck8s/management_cluster.go b/pkg/ck8s/management_cluster.go index 2af847f1..51dcb6f9 100644 --- a/pkg/ck8s/management_cluster.go +++ b/pkg/ck8s/management_cluster.go @@ -104,69 +104,9 @@ func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.O ClientRestConfig: restConfig, K8sdClientGenerator: g, microclusterPort: microclusterPort, - - /** - CoreDNSMigrator: &CoreDNSMigrator{}, - **/ - } - // NOTE(neoaggelos): Upstream creates an etcd client generator, so that users can reach etcd on each node. - // - // TODO(neoaggelos): For Canonical Kubernetes, we need to create a client generator for the k8sd endpoints on the control plane nodes. - - /** - // Retrieves the etcd CA key Pair - crtData, keyData, err := m.getEtcdCAKeyPair(ctx, clusterKey) - if err != nil { - return nil, err } - // If etcd CA is not nil, then it's managed etcd - if crtData != nil { - clientCert, err := generateClientCert(crtData, keyData) - if err != nil { - return nil, err - } - - caPool := x509.NewCertPool() - caPool.AppendCertsFromPEM(crtData) - tlsConfig := &tls.Config{ - RootCAs: caPool, - Certificates: []tls.Certificate{clientCert}, - MinVersion: tls.VersionTLS12, - } - tlsConfig.InsecureSkipVerify = true - workload.etcdClientGenerator = NewEtcdClientGenerator(restConfig, tlsConfig, m.EtcdDialTimeout, m.EtcdCallTimeout) - } - **/ - return workload, nil } -//nolint:godot -/** -func (m *Management) getEtcdCAKeyPair(ctx context.Context, clusterKey client.ObjectKey) ([]byte, []byte, error) { - etcdCASecret := &corev1.Secret{} - etcdCAObjectKey := client.ObjectKey{ - Namespace: clusterKey.Namespace, - Name: fmt.Sprintf("%s-etcd", clusterKey.Name), - } - - // Try to get the certificate via the uncached client. - if err := m.Client.Get(ctx, etcdCAObjectKey, etcdCASecret); err != nil { - if apierrors.IsNotFound(err) { - return nil, nil, nil - } else { - return nil, nil, errors.Wrapf(err, "failed to get secret; etcd CA bundle %s/%s", etcdCAObjectKey.Namespace, etcdCAObjectKey.Name) - } - } - - crtData, ok := etcdCASecret.Data[secret.TLSCrtDataName] - if !ok { - return nil, nil, errors.Errorf("etcd tls crt does not exist for cluster %s/%s", clusterKey.Namespace, clusterKey.Name) - } - keyData := etcdCASecret.Data[secret.TLSKeyDataName] - return crtData, keyData, nil -} -**/ - var _ ManagementCluster = &Management{} diff --git a/pkg/ck8s/workload_cluster.go b/pkg/ck8s/workload_cluster.go index 5cf6de1d..76d6c87d 100644 --- a/pkg/ck8s/workload_cluster.go +++ b/pkg/ck8s/workload_cluster.go @@ -32,10 +32,6 @@ const ( k8sdConfigSecretName = "k8sd-config" //nolint:gosec ) -var ( - ErrControlPlaneMinNodes = errors.New("cluster has fewer than 2 control plane nodes; removing an etcd member is not supported") -) - // WorkloadCluster defines all behaviors necessary to upgrade kubernetes on a workload cluster // // TODO: Add a detailed description to each of these method definitions. @@ -43,28 +39,10 @@ type WorkloadCluster interface { // Basic health and status checks. ClusterStatus(ctx context.Context) (ClusterStatus, error) UpdateAgentConditions(ctx context.Context, controlPlane *ControlPlane) - UpdateEtcdConditions(ctx context.Context, controlPlane *ControlPlane) NewControlPlaneJoinToken(ctx context.Context, name string) (string, error) NewWorkerJoinToken(ctx context.Context) (string, error) RemoveMachineFromCluster(ctx context.Context, machine *clusterv1.Machine) error - - // NOTE(neoaggelos): See notes in (*CK8sControlPlaneReconciler).reconcileEtcdMembers - // - // TODO(neoaggelos): Replace with operations that use the k8sd proxy with things we need. For example, the function to remove a node _could_ be: - // - // RemoveMachineFromCluster(ctx context.Context, machine *clusterv1.Machine) - // - // Then, the implementation of WorkloadCluster should handle everything (reaching to k8sd, calling the right endpoints, authenticating, etc) - // internally. - /** - // Etcd tasks - RemoveEtcdMemberForMachine(ctx context.Context, machine *clusterv1.Machine) (bool, error) - ForwardEtcdLeadership(ctx context.Context, machine *clusterv1.Machine, leaderCandidate *clusterv1.Machine) error - ReconcileEtcdMembers(ctx context.Context, nodeNames []string) ([]string, error) - **/ - - // AllowBootstrapTokensToGetNodes(ctx context.Context) error } // Workload defines operations on workload clusters. @@ -76,13 +54,6 @@ type Workload struct { ClientRestConfig *rest.Config K8sdClientGenerator *k8sdClientGenerator microclusterPort int - - // NOTE(neoaggelos): CoreDNSMigrator and etcdClientGenerator are used by upstream to reach and manage the services in the workload cluster - // TODO(neoaggelos): Replace them with a k8sdProxyClientGenerator. - /** - CoreDNSMigrator coreDNSMigrator - etcdClientGenerator etcdClientFor - **/ } // ClusterStatus holds stats information about the cluster. @@ -100,9 +71,6 @@ func (w *Workload) getControlPlaneNodes(ctx context.Context) (*corev1.NodeList, labels := map[string]string{ // NOTE(neoaggelos): Canonical Kubernetes uses node-role.kubernetes.io/control-plane="" as a label for control plane nodes. labelNodeRoleControlPlane: "", - /** - labelNodeRoleControlPlane: "true", - **/ } if err := w.Client.List(ctx, nodes, ctrlclient.MatchingLabels(labels)); err != nil { return nil, err @@ -726,265 +694,4 @@ func aggregateFromMachinesToKCP(input aggregateFromMachinesToKCPInput) { // So there will be no condition at KCP level too. } -// UpdateEtcdConditions is responsible for updating machine conditions reflecting the status of all the etcd members. -// This operation is best effort, in the sense that in case of problems in retrieving member status, it sets -// the condition to Unknown state without returning any error. -func (w *Workload) UpdateEtcdConditions(ctx context.Context, controlPlane *ControlPlane) { - w.updateManagedEtcdConditions(ctx, controlPlane) -} - -func (w *Workload) updateManagedEtcdConditions(ctx context.Context, controlPlane *ControlPlane) { - // NOTE: This methods uses control plane nodes only to get in contact with etcd but then it relies on etcd - // as ultimate source of truth for the list of members and for their health. - controlPlaneNodes, err := w.getControlPlaneNodes(ctx) - if err != nil { - conditions.MarkUnknown(controlPlane.KCP, controlplanev1.EtcdClusterHealthyCondition, controlplanev1.EtcdClusterInspectionFailedReason, "Failed to list nodes which are hosting the etcd members") - for _, m := range controlPlane.Machines { - conditions.MarkUnknown(m, controlplanev1.MachineEtcdMemberHealthyCondition, controlplanev1.EtcdMemberInspectionFailedReason, "Failed to get the node which is hosting the etcd member") - } - return - } - - // NOTE(neoaggelos): Upstream queries the etcd cluster endpoint on each of the machine nodes. It verifies that the list of etcd peers retrieved by - // every node in the cluster matches with other nodes, and also verifies that they report the same etcd cluster ID. - // - // In the case of k8s-dqlite, we should do similar steps against the k8s-dqlite cluster. Until that is implemented, we skip this check and assume - // that the node's datastore is in healthy condition if there are matching clusterv1.Machine and corev1.Node objects. - // - // TODO(neoaggelos): Implement API endpoints in k8sd to reach the local k8s-dqlite node and report the known cluster members. Then, verify that the - // list of members matches across all the nodes. - - // Update conditions for etcd members on the nodes. - var ( - // kcpErrors is used to store errors that can't be reported on any machine. - kcpErrors []string - /** - // clusterID is used to store and compare the etcd's cluster id. - clusterID *uint64 - // members is used to store the list of etcd members and compare with all the other nodes in the cluster. - members []*etcd.Member - **/ - ) - - for _, node := range controlPlaneNodes.Items { - // Search for the machine corresponding to the node. - var machine *clusterv1.Machine - for _, m := range controlPlane.Machines { - if m.Status.NodeRef != nil && m.Status.NodeRef.Name == node.Name { - machine = m - } - } - - if machine == nil { - // If there are machines still provisioning there is the chance that a chance that a node might be linked to a machine soon, - // otherwise report the error at KCP level given that there is no machine to report on. - if hasProvisioningMachine(controlPlane.Machines) { - continue - } - kcpErrors = append(kcpErrors, fmt.Sprintf("Control plane node %s does not have a corresponding machine", node.Name)) - continue - } - - // If the machine is deleting, report all the conditions as deleting - if !machine.ObjectMeta.DeletionTimestamp.IsZero() { - conditions.MarkFalse(machine, controlplanev1.MachineEtcdMemberHealthyCondition, clusterv1.DeletingReason, clusterv1.ConditionSeverityInfo, "") - continue - } - - /** - currentMembers, err := w.getCurrentEtcdMembers(ctx, machine, node.Name) - if err != nil { - continue - } - - // Check if the list of members IDs reported is the same as all other members. - // NOTE: the first member reporting this information is the baseline for this information. - if members == nil { - members = currentMembers - } - if !etcdutil.MemberEqual(members, currentMembers) { - conditions.MarkFalse(machine, controlplanev1.MachineEtcdMemberHealthyCondition, controlplanev1.EtcdMemberUnhealthyReason, clusterv1.ConditionSeverityError, "etcd member reports the cluster is composed by members %s, but all previously seen etcd members are reporting %s", etcdutil.MemberNames(currentMembers), etcdutil.MemberNames(members)) - continue - } - - // Retrieve the member and check for alarms. - // NB. The member for this node always exists given forFirstAvailableNode(node) used above - member := etcdutil.MemberForName(currentMembers, node.Name) - if member == nil { - conditions.MarkFalse(machine, controlplanev1.MachineEtcdMemberHealthyCondition, controlplanev1.EtcdMemberUnhealthyReason, clusterv1.ConditionSeverityError, "etcd member reports the cluster is composed by members %s, but the member itself (%s) is not included", etcdutil.MemberNames(currentMembers), node.Name) - continue - } - if len(member.Alarms) > 0 { - alarmList := []string{} - for _, alarm := range member.Alarms { - switch alarm { - case etcd.AlarmOK: - continue - default: - alarmList = append(alarmList, etcd.AlarmTypeName[alarm]) - } - } - if len(alarmList) > 0 { - conditions.MarkFalse(machine, controlplanev1.MachineEtcdMemberHealthyCondition, controlplanev1.EtcdMemberUnhealthyReason, clusterv1.ConditionSeverityError, "Etcd member reports alarms: %s", strings.Join(alarmList, ", ")) - continue - } - } - - // Check if the member belongs to the same cluster as all other members. - // NOTE: the first member reporting this information is the baseline for this information. - if clusterID == nil { - clusterID = &member.ClusterID - } - if *clusterID != member.ClusterID { - conditions.MarkFalse(machine, controlplanev1.MachineEtcdMemberHealthyCondition, controlplanev1.EtcdMemberUnhealthyReason, clusterv1.ConditionSeverityError, "etcd member has cluster ID %d, but all previously seen etcd members have cluster ID %d", member.ClusterID, *clusterID) - continue - } - **/ - - conditions.MarkTrue(machine, controlplanev1.MachineEtcdMemberHealthyCondition) - } - - /** - // Make sure that the list of etcd members and machines is consistent. - kcpErrors = compareMachinesAndMembers(controlPlane, members, kcpErrors) - **/ - - // Aggregate components error from machines at KCP level - aggregateFromMachinesToKCP(aggregateFromMachinesToKCPInput{ - controlPlane: controlPlane, - machineConditions: []clusterv1.ConditionType{controlplanev1.MachineEtcdMemberHealthyCondition}, - kcpErrors: kcpErrors, - condition: controlplanev1.EtcdClusterHealthyCondition, - unhealthyReason: controlplanev1.EtcdClusterUnhealthyReason, - unknownReason: controlplanev1.EtcdClusterUnknownReason, - note: "etcd member", - }) -} - -//nolint:godot -/** -func (w *Workload) getCurrentEtcdMembers(ctx context.Context, machine *clusterv1.Machine, nodeName string) ([]*etcd.Member, error) { - // Create the etcd Client for the etcd Pod scheduled on the Node - etcdClient, err := w.etcdClientGenerator.forFirstAvailableNode(ctx, []string{nodeName}) - if err != nil { - conditions.MarkUnknown(machine, controlplanev1.MachineEtcdMemberHealthyCondition, controlplanev1.EtcdMemberInspectionFailedReason, "Failed to connect to the etcd pod on the %s node: %s", nodeName, err) - return nil, errors.Wrapf(err, "failed to get current etcd members: failed to connect to the etcd pod on the %s node", nodeName) - } - defer etcdClient.Close() - - // While creating a new client, forFirstAvailableNode retrieves the status for the endpoint; check if the endpoint has errors. - if len(etcdClient.Errors) > 0 { - conditions.MarkFalse(machine, controlplanev1.MachineEtcdMemberHealthyCondition, controlplanev1.EtcdMemberUnhealthyReason, clusterv1.ConditionSeverityError, "Etcd member status reports errors: %s", strings.Join(etcdClient.Errors, ", ")) - return nil, errors.Errorf("failed to get current etcd members: etcd member status reports errors: %s", strings.Join(etcdClient.Errors, ", ")) - } - - // Gets the list etcd members known by this member. - currentMembers, err := etcdClient.Members(ctx) - if err != nil { - // NB. We should never be in here, given that we just received answer to the etcd calls included in forFirstAvailableNode; - // however, we are considering the calls to Members a signal of etcd not being stable. - conditions.MarkFalse(machine, controlplanev1.MachineEtcdMemberHealthyCondition, controlplanev1.EtcdMemberUnhealthyReason, clusterv1.ConditionSeverityError, "Failed get answer from the etcd member on the %s node", nodeName) - return nil, errors.Errorf("failed to get current etcd members: failed get answer from the etcd member on the %s node", nodeName) - } - - return currentMembers, nil -} - -func compareMachinesAndMembers(controlPlane *ControlPlane, members []*etcd.Member, kcpErrors []string) []string { - // NOTE: We run this check only if we actually know the list of members, otherwise the first for loop - // could generate a false negative when reporting missing etcd members. - if members == nil { - return kcpErrors - } - - // Check Machine -> Etcd member. - for _, machine := range controlPlane.Machines { - if machine.Status.NodeRef == nil { - continue - } - found := false - for _, member := range members { - nodeNameFromMember := etcdutil.NodeNameFromMember(member) - if machine.Status.NodeRef.Name == nodeNameFromMember { - found = true - break - } - } - if !found { - conditions.MarkFalse(machine, controlplanev1.MachineEtcdMemberHealthyCondition, controlplanev1.EtcdMemberUnhealthyReason, clusterv1.ConditionSeverityError, "Missing etcd member") - } - } - - // Check Etcd member -> Machine. - for _, member := range members { - found := false - nodeNameFromMember := etcdutil.NodeNameFromMember(member) - for _, machine := range controlPlane.Machines { - if machine.Status.NodeRef != nil && machine.Status.NodeRef.Name == nodeNameFromMember { - found = true - break - } - } - if !found { - name := nodeNameFromMember - if name == "" { - name = fmt.Sprintf("%d (Name not yet assigned)", member.ID) - } - kcpErrors = append(kcpErrors, fmt.Sprintf("etcd member %s does not have a corresponding machine", name)) - } - } - return kcpErrors -} - -func generateClientCert(caCertEncoded, caKeyEncoded []byte) (tls.Certificate, error) { - // TODO: need to cache clientkey to clusterCacheTracker to avoid recreating key frequently - clientKey, err := certs.NewPrivateKey() - if err != nil { - return tls.Certificate{}, errors.Wrapf(err, "error creating client key") - } - - caCert, err := certs.DecodeCertPEM(caCertEncoded) - if err != nil { - return tls.Certificate{}, err - } - caKey, err := certs.DecodePrivateKeyPEM(caKeyEncoded) - if err != nil { - return tls.Certificate{}, err - } - x509Cert, err := newClientCert(caCert, clientKey, caKey) - if err != nil { - return tls.Certificate{}, err - } - return tls.X509KeyPair(certs.EncodeCertPEM(x509Cert), certs.EncodePrivateKeyPEM(clientKey)) -} - -func newClientCert(caCert *x509.Certificate, key *rsa.PrivateKey, caKey crypto.Signer) (*x509.Certificate, error) { - cfg := certs.Config{ - CommonName: "cluster-api.x-k8s.io", - } - - now := time.Now().UTC() - - tmpl := x509.Certificate{ - SerialNumber: new(big.Int).SetInt64(0), - Subject: pkix.Name{ - CommonName: cfg.CommonName, - Organization: cfg.Organization, - }, - NotBefore: now.Add(time.Minute * -5), - NotAfter: now.Add(time.Hour * 24 * 365 * 10), // 10 years - KeyUsage: x509.KeyUsageDigitalSignature, - ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth}, - } - - b, err := x509.CreateCertificate(rand.Reader, &tmpl, caCert, key.Public(), caKey) - if err != nil { - return nil, errors.Wrapf(err, "failed to create signed client certificate: %+v", tmpl) - } - - c, err := x509.ParseCertificate(b) - return c, errors.WithStack(err) -} -**/ - var _ WorkloadCluster = &Workload{} diff --git a/pkg/ck8s/workload_cluster_k8sd.go b/pkg/ck8s/workload_cluster_k8sd.go index 8b0b312d..d475a8da 100644 --- a/pkg/ck8s/workload_cluster_k8sd.go +++ b/pkg/ck8s/workload_cluster_k8sd.go @@ -4,11 +4,11 @@ import ( "context" "crypto/tls" _ "embed" + "errors" "fmt" "net/http" "time" - "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -71,7 +71,7 @@ func (g *k8sdClientGenerator) forNode(ctx context.Context, node *corev1.Node) (* func (g *k8sdClientGenerator) getProxyPods(ctx context.Context) (map[string]string, error) { pods, err := g.clientset.CoreV1().Pods("kube-system").List(ctx, metav1.ListOptions{LabelSelector: "app=k8sd-proxy"}) if err != nil { - return nil, errors.Wrap(err, "unable to list k8sd-proxy pods in target cluster") + return nil, fmt.Errorf("unable to list k8sd-proxy pods in target cluster: %w", err) } if len(pods.Items) == 0 { diff --git a/pkg/cloudinit/common.go b/pkg/cloudinit/common.go index 11d8aaaa..afad66b2 100644 --- a/pkg/cloudinit/common.go +++ b/pkg/cloudinit/common.go @@ -9,9 +9,26 @@ import ( "k8s.io/apimachinery/pkg/util/version" ) +type InstallOption string + +const ( + InstallOptionChannel InstallOption = "channel" + InstallOptionRevision InstallOption = "revision" + InstallOptionLocalPath InstallOption = "local-path" +) + +type SnapInstallData struct { + // Option is the snap install option e.g. --channel, --revision. + Option InstallOption + // Value is the snap install value e.g. 1.30/stable, 123, /path/to/k8s.snap. + Value string +} + type BaseUserData struct { // KubernetesVersion is the Kubernetes version from the cluster object. KubernetesVersion string + // SnapInstallData is the snap install data. + SnapInstallData SnapInstallData // BootCommands is a list of commands to run early in the boot process. BootCommands []string // PreRunCommands is a list of commands to run prior to k8s installation. @@ -24,6 +41,12 @@ type BaseUserData struct { ConfigFileContents string // AirGapped declares that a custom installation script is to be used. AirGapped bool + // The snap store proxy domain's scheme, e.g. "http" or "https" without "://" + SnapstoreProxyScheme string + // The snap store proxy domain + SnapstoreProxyDomain string + // The snap store proxy ID + SnapstoreProxyID string // MicroclusterAddress is the address to use for microcluster. MicroclusterAddress string // MicroclusterPort is the port to use for microcluster. @@ -40,6 +63,13 @@ func NewBaseCloudConfig(data BaseUserData) (CloudConfig, error) { return CloudConfig{}, fmt.Errorf("failed to parse kubernetes version %q: %w", data.KubernetesVersion, err) } + snapInstall := data.SnapInstallData + // Default to k8s version if snap install option is not set or empty. + if snapInstall.Option == "" || snapInstall.Value == "" { + snapInstall.Option = InstallOptionChannel + snapInstall.Value = fmt.Sprintf("%d.%d-classic/stable", kubernetesVersion.Major(), kubernetesVersion.Minor()) + } + config := CloudConfig{ RunCommands: []string{"set -x"}, WriteFiles: make([]File, 0, len(scripts)+len(data.ExtraFiles)+3), @@ -54,6 +84,13 @@ func NewBaseCloudConfig(data BaseUserData) (CloudConfig, error) { Owner: "root:root", }) } + + // snapstore proxy configuration + if snapStoreConfigFiles := getSnapstoreProxyConfigFiles(data); snapStoreConfigFiles != nil { + config.WriteFiles = append(config.WriteFiles, snapStoreConfigFiles...) + config.RunCommands = append(config.RunCommands, "/capi/scripts/configure-snapstore-proxy.sh") + } + // write files config.WriteFiles = append( config.WriteFiles, @@ -84,8 +121,8 @@ func NewBaseCloudConfig(data BaseUserData) (CloudConfig, error) { Owner: "root:root", }, File{ - Path: "/capi/etc/snap-track", - Content: fmt.Sprintf("%d.%d-classic/stable", kubernetesVersion.Major(), kubernetesVersion.Minor()), + Path: fmt.Sprintf("/capi/etc/snap-%s", snapInstall.Option), + Content: snapInstall.Value, Permissions: "0400", Owner: "root:root", }, @@ -101,3 +138,44 @@ func NewBaseCloudConfig(data BaseUserData) (CloudConfig, error) { func makeMicroclusterAddress(address string, port int) string { return net.JoinHostPort(address, strconv.Itoa(port)) } + +// getSnapstoreProxyConfigFiles returns the snapstore proxy config files. +// If the snapstore proxy domain or ID is not set, it returns nil. +// Nil indicates that no files are returned. +func getSnapstoreProxyConfigFiles(data BaseUserData) []File { + snapstoreProxyScheme := data.SnapstoreProxyScheme + snapstoreProxyDomain := data.SnapstoreProxyDomain + snapstoreProxyID := data.SnapstoreProxyID + + scheme := "http" + if snapstoreProxyScheme != "" { + scheme = snapstoreProxyScheme + } + + if snapstoreProxyDomain == "" || snapstoreProxyID == "" { + return nil + } + + schemeFile := File{ + Path: "/capi/etc/snapstore-proxy-scheme", + Content: scheme, + Permissions: "0400", + Owner: "root:root", + } + + domainFile := File{ + Path: "/capi/etc/snapstore-proxy-domain", + Content: snapstoreProxyDomain, + Permissions: "0400", + Owner: "root:root", + } + + storeIDFile := File{ + Path: "/capi/etc/snapstore-proxy-id", + Content: snapstoreProxyID, + Permissions: "0400", + Owner: "root:root", + } + + return []File{schemeFile, domainFile, storeIDFile} +} diff --git a/pkg/cloudinit/controlplane_init_test.go b/pkg/cloudinit/controlplane_init_test.go index 4da71d19..d497d1d0 100644 --- a/pkg/cloudinit/controlplane_init_test.go +++ b/pkg/cloudinit/controlplane_init_test.go @@ -17,9 +17,11 @@ limitations under the License. package cloudinit_test import ( + "fmt" "testing" . "github.com/onsi/gomega" + "github.com/onsi/gomega/gstruct" "github.com/canonical/cluster-api-k8s/pkg/cloudinit" ) @@ -29,10 +31,13 @@ func TestNewInitControlPlane(t *testing.T) { config, err := cloudinit.NewInitControlPlane(cloudinit.InitControlPlaneInput{ BaseUserData: cloudinit.BaseUserData{ - KubernetesVersion: "v1.30.0", - BootCommands: []string{"bootcmd"}, - PreRunCommands: []string{"prerun1", "prerun2"}, - PostRunCommands: []string{"postrun1", "postrun2"}, + KubernetesVersion: "v1.30.0", + BootCommands: []string{"bootcmd"}, + PreRunCommands: []string{"prerun1", "prerun2"}, + PostRunCommands: []string{"postrun1", "postrun2"}, + SnapstoreProxyScheme: "http", + SnapstoreProxyDomain: "snapstore.io", + SnapstoreProxyID: "abcd-1234-xyz", ExtraFiles: []cloudinit.File{{ Path: "/tmp/file", Content: "test file", @@ -54,6 +59,7 @@ func TestNewInitControlPlane(t *testing.T) { // Verify the run commands. g.Expect(config.RunCommands).To(Equal([]string{ "set -x", + "/capi/scripts/configure-snapstore-proxy.sh", "prerun1", "prerun2", "/capi/scripts/install.sh", @@ -79,13 +85,17 @@ func TestNewInitControlPlane(t *testing.T) { HaveField("Path", "/capi/scripts/configure-auth-token.sh"), HaveField("Path", "/capi/scripts/configure-node-token.sh"), HaveField("Path", "/capi/scripts/create-sentinel-bootstrap.sh"), + HaveField("Path", "/capi/scripts/configure-snapstore-proxy.sh"), HaveField("Path", "/capi/etc/config.yaml"), HaveField("Path", "/capi/etc/microcluster-address"), HaveField("Path", "/capi/etc/node-name"), HaveField("Path", "/capi/etc/node-token"), HaveField("Path", "/capi/etc/token"), - HaveField("Path", "/capi/etc/snap-track"), + HaveField("Path", "/capi/etc/snap-channel"), HaveField("Path", "/capi/manifests/00-k8sd-proxy.yaml"), + HaveField("Path", "/capi/etc/snapstore-proxy-scheme"), + HaveField("Path", "/capi/etc/snapstore-proxy-domain"), + HaveField("Path", "/capi/etc/snapstore-proxy-id"), HaveField("Path", "/tmp/file"), ), "Some /capi/scripts files are missing") } @@ -121,3 +131,74 @@ func TestNewInitControlPlaneAirGapped(t *testing.T) { // Verify the run commands is missing install.sh script. g.Expect(config.RunCommands).NotTo(ContainElement("/capi/scripts/install.sh")) } + +func TestNewInitControlPlaneSnapInstall(t *testing.T) { + t.Run("DefaultSnapInstall", func(t *testing.T) { + g := NewWithT(t) + + config, err := cloudinit.NewInitControlPlane(cloudinit.InitControlPlaneInput{ + BaseUserData: cloudinit.BaseUserData{ + KubernetesVersion: "v1.30.0", + BootCommands: []string{"bootcmd"}, + PreRunCommands: []string{"prerun1", "prerun2"}, + PostRunCommands: []string{"postrun1", "postrun2"}, + }}) + + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(config.WriteFiles).To(ContainElement(gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{ + "Path": Equal(fmt.Sprintf("/capi/etc/snap-%s", cloudinit.InstallOptionChannel)), + "Content": Equal("1.30-classic/stable"), + }))) + g.Expect(config.WriteFiles).ToNot(ContainElement(HaveField("Path", fmt.Sprintf("/capi/etc/snap-%s", cloudinit.InstallOptionRevision)))) + g.Expect(config.WriteFiles).ToNot(ContainElement(HaveField("Path", fmt.Sprintf("/capi/etc/snap-%s", cloudinit.InstallOptionLocalPath)))) + }) + + tests := []struct { + name string + snapInstall cloudinit.SnapInstallData + }{ + { + name: "ChannelOverride", + snapInstall: cloudinit.SnapInstallData{ + Option: cloudinit.InstallOptionChannel, + Value: "v1.30/stable", + }, + }, + { + name: "RevisionOverride", + snapInstall: cloudinit.SnapInstallData{ + Option: cloudinit.InstallOptionRevision, + Value: "123", + }, + }, + { + name: "LocalPathOverride", + snapInstall: cloudinit.SnapInstallData{ + Option: cloudinit.InstallOptionLocalPath, + Value: "/path/to/k8s.snap", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + config, err := cloudinit.NewInitControlPlane(cloudinit.InitControlPlaneInput{ + BaseUserData: cloudinit.BaseUserData{ + KubernetesVersion: "v1.30.0", + SnapInstallData: tt.snapInstall, + BootCommands: []string{"bootcmd"}, + PreRunCommands: []string{"prerun1", "prerun2"}, + PostRunCommands: []string{"postrun1", "postrun2"}, + }}) + + g.Expect(err).NotTo(HaveOccurred()) + + g.Expect(config.WriteFiles).To(ContainElement(gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{ + "Path": Equal(fmt.Sprintf("/capi/etc/snap-%s", tt.snapInstall.Option)), + "Content": Equal(tt.snapInstall.Value), + }))) + }) + } +} diff --git a/pkg/cloudinit/controlplane_join_test.go b/pkg/cloudinit/controlplane_join_test.go index c2a4d5cd..6f4155d3 100644 --- a/pkg/cloudinit/controlplane_join_test.go +++ b/pkg/cloudinit/controlplane_join_test.go @@ -1,9 +1,11 @@ package cloudinit_test import ( + "fmt" "testing" . "github.com/onsi/gomega" + "github.com/onsi/gomega/gstruct" "github.com/canonical/cluster-api-k8s/pkg/cloudinit" ) @@ -13,10 +15,13 @@ func TestNewJoinControlPlane(t *testing.T) { config, err := cloudinit.NewJoinControlPlane(cloudinit.JoinControlPlaneInput{ BaseUserData: cloudinit.BaseUserData{ - KubernetesVersion: "v1.30.0", - BootCommands: []string{"bootcmd"}, - PreRunCommands: []string{"prerun1", "prerun2"}, - PostRunCommands: []string{"postrun1", "postrun2"}, + KubernetesVersion: "v1.30.0", + BootCommands: []string{"bootcmd"}, + PreRunCommands: []string{"prerun1", "prerun2"}, + PostRunCommands: []string{"postrun1", "postrun2"}, + SnapstoreProxyScheme: "http", + SnapstoreProxyDomain: "snapstore.io", + SnapstoreProxyID: "abcd-1234-xyz", ExtraFiles: []cloudinit.File{{ Path: "/tmp/file", Content: "test file", @@ -37,6 +42,7 @@ func TestNewJoinControlPlane(t *testing.T) { // Verify the run commands. g.Expect(config.RunCommands).To(Equal([]string{ "set -x", + "/capi/scripts/configure-snapstore-proxy.sh", "prerun1", "prerun2", "/capi/scripts/install.sh", @@ -60,12 +66,16 @@ func TestNewJoinControlPlane(t *testing.T) { HaveField("Path", "/capi/scripts/configure-auth-token.sh"), HaveField("Path", "/capi/scripts/configure-node-token.sh"), HaveField("Path", "/capi/scripts/create-sentinel-bootstrap.sh"), + HaveField("Path", "/capi/scripts/configure-snapstore-proxy.sh"), HaveField("Path", "/capi/etc/config.yaml"), HaveField("Path", "/capi/etc/microcluster-address"), HaveField("Path", "/capi/etc/node-name"), HaveField("Path", "/capi/etc/node-token"), HaveField("Path", "/capi/etc/join-token"), - HaveField("Path", "/capi/etc/snap-track"), + HaveField("Path", "/capi/etc/snap-channel"), + HaveField("Path", "/capi/etc/snapstore-proxy-scheme"), + HaveField("Path", "/capi/etc/snapstore-proxy-domain"), + HaveField("Path", "/capi/etc/snapstore-proxy-id"), HaveField("Path", "/tmp/file"), ), "Some /capi/scripts files are missing") } @@ -105,3 +115,74 @@ func TestNewJoinControlPlaneAirGapped(t *testing.T) { // Verify the run commands is missing install.sh script. g.Expect(config.RunCommands).NotTo(ContainElement("/capi/scripts/install.sh")) } + +func TestNewJoinControlPlaneSnapInstall(t *testing.T) { + t.Run("DefaultSnapInstall", func(t *testing.T) { + g := NewWithT(t) + + config, err := cloudinit.NewJoinControlPlane(cloudinit.JoinControlPlaneInput{ + BaseUserData: cloudinit.BaseUserData{ + KubernetesVersion: "v1.30.0", + BootCommands: []string{"bootcmd"}, + PreRunCommands: []string{"prerun1", "prerun2"}, + PostRunCommands: []string{"postrun1", "postrun2"}, + }}) + + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(config.WriteFiles).To(ContainElement(gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{ + "Path": Equal(fmt.Sprintf("/capi/etc/snap-%s", cloudinit.InstallOptionChannel)), + "Content": Equal("1.30-classic/stable"), + }))) + g.Expect(config.WriteFiles).ToNot(ContainElement(HaveField("Path", fmt.Sprintf("/capi/etc/snap-%s", cloudinit.InstallOptionRevision)))) + g.Expect(config.WriteFiles).ToNot(ContainElement(HaveField("Path", fmt.Sprintf("/capi/etc/snap-%s", cloudinit.InstallOptionLocalPath)))) + }) + + tests := []struct { + name string + snapInstall cloudinit.SnapInstallData + }{ + { + name: "ChannelOverride", + snapInstall: cloudinit.SnapInstallData{ + Option: cloudinit.InstallOptionChannel, + Value: "v1.30/stable", + }, + }, + { + name: "RevisionOverride", + snapInstall: cloudinit.SnapInstallData{ + Option: cloudinit.InstallOptionRevision, + Value: "123", + }, + }, + { + name: "LocalPathOverride", + snapInstall: cloudinit.SnapInstallData{ + Option: cloudinit.InstallOptionLocalPath, + Value: "/path/to/k8s.snap", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + config, err := cloudinit.NewJoinControlPlane(cloudinit.JoinControlPlaneInput{ + BaseUserData: cloudinit.BaseUserData{ + KubernetesVersion: "v1.30.0", + SnapInstallData: tt.snapInstall, + BootCommands: []string{"bootcmd"}, + PreRunCommands: []string{"prerun1", "prerun2"}, + PostRunCommands: []string{"postrun1", "postrun2"}, + }}) + + g.Expect(err).NotTo(HaveOccurred()) + + g.Expect(config.WriteFiles).To(ContainElement(gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{ + "Path": Equal(fmt.Sprintf("/capi/etc/snap-%s", tt.snapInstall.Option)), + "Content": Equal(tt.snapInstall.Value), + }))) + }) + } +} diff --git a/pkg/cloudinit/embed.go b/pkg/cloudinit/embed.go index 1c6f2779..3344056a 100644 --- a/pkg/cloudinit/embed.go +++ b/pkg/cloudinit/embed.go @@ -12,6 +12,8 @@ var ( type script string +// NOTE(eac): If you want to use a script from pkg/cloudinit/scripts in your code (for example, you want to include a script in the user-data.txt), +// you need to add it to the scripts map below. var ( scriptInstall script = "install.sh" scriptBootstrap script = "bootstrap.sh" @@ -22,6 +24,7 @@ var ( scriptWaitAPIServerReady script = "wait-apiserver-ready.sh" scriptDeployManifests script = "deploy-manifests.sh" scriptCreateSentinelBootstrap script = "create-sentinel-bootstrap.sh" + scriptConfigureSnapstoreProxy script = "configure-snapstore-proxy.sh" ) func mustEmbed(s script) string { @@ -44,5 +47,6 @@ var ( scriptWaitAPIServerReady: mustEmbed(scriptWaitAPIServerReady), scriptDeployManifests: mustEmbed(scriptDeployManifests), scriptCreateSentinelBootstrap: mustEmbed(scriptCreateSentinelBootstrap), + scriptConfigureSnapstoreProxy: mustEmbed(scriptConfigureSnapstoreProxy), } ) diff --git a/pkg/cloudinit/scripts/configure-snapstore-proxy.sh b/pkg/cloudinit/scripts/configure-snapstore-proxy.sh new file mode 100644 index 00000000..fd6bb71e --- /dev/null +++ b/pkg/cloudinit/scripts/configure-snapstore-proxy.sh @@ -0,0 +1,55 @@ +#!/bin/bash -xe + +# Assumptions: +# - snapd is installed +# - /capi/etc/snapstore-proxy-scheme contains the snapstore scheme +# - /capi/etc/snapstore-proxy-domain contains the snapstore domain +# - /capi/etc/snapstore-proxy-id contains the snapstore id + +if [ ! -s /capi/etc/snapstore-proxy-scheme ] || [ ! -s /capi/etc/snapstore-proxy-domain ] || [ ! -s /capi/etc/snapstore-proxy-id ]; then + echo "Missing or empty snapstore proxy configuration files, exiting." + exit 1 +fi + +SNAPSTORE_PROXY_SCHEME=$(cat /capi/etc/snapstore-proxy-scheme) +SNAPSTORE_PROXY_DOMAIN=$(cat /capi/etc/snapstore-proxy-domain) +SNAPSTORE_PROXY_ID=$(cat /capi/etc/snapstore-proxy-id) + +if ! type -P curl; then + count=0 + while ! snap install curl; do + count=$((count + 1)) + if [ $count -gt 5 ]; then + echo "Failed to install curl, exiting." + exit 1 + fi + echo "Failed to install curl, retrying ($count/5)" + sleep 5 + done +fi + +count=0 +while ! curl -sL "${SNAPSTORE_PROXY_SCHEME}"://"${SNAPSTORE_PROXY_DOMAIN}"/v2/auth/store/assertions | snap ack /dev/stdin; do + count=$((count + 1)) + if [ $count -ge 5 ]; then + echo "Failed to ACK store assertions, exiting." + exit 1 + fi + + echo "Failed to ACK store assertions, retrying ($count/5)" + sleep 5 +done + +count=0 +while ! snap set core proxy.store="${SNAPSTORE_PROXY_ID}"; do + count=$((count + 1)) + if [ $count -ge 5 ]; then + echo "Failed to configure snapd with store ID, exiting." + exit 1 + fi + + echo "Failed to configure snapd with store ID, retrying ($count/5)" + sleep 5 +done + +systemctl restart snapd diff --git a/pkg/cloudinit/scripts/install.sh b/pkg/cloudinit/scripts/install.sh index 11fc04cc..bcc01a3e 100644 --- a/pkg/cloudinit/scripts/install.sh +++ b/pkg/cloudinit/scripts/install.sh @@ -1,8 +1,20 @@ #!/bin/bash -xe ## Assumptions: -## - /capi/etc/snap-track contains the snap track that matches the installed Kubernetes version, e.g. "v1.30.1" -> "1.30-classic/stable" +## - /capi/etc/snap-channel contains the snap channel to be installed that matches the desired Kubernetes version, e.g. "v1.30.1" -> "1.30-classic/stable" +## - /capi/etc/snap-revision contains the snap revision to be installed, e.g. 123 +## - /capi/etc/snap-local-path contains the path to the local snap file to be installed, e.g. /path/to/k8s.snap -snap_track="$(cat /capi/etc/snap-track)" - -snap install k8s --classic --channel "${snap_track}" +if [ -f "/capi/etc/snap-channel" ]; then + snap_channel="$(cat /capi/etc/snap-channel)" + snap install k8s --classic --channel "${snap_channel}" +elif [ -f "/capi/etc/snap-revision" ]; then + snap_revision="$(cat /capi/etc/snap-revision)" + snap install k8s --classic --revision "${snap_revision}" +elif [ -f "/capi/etc/snap-local-path" ]; then + snap_local_path="$(cat /capi/etc/snap-local-path)" + snap install k8s --classic --dangerous "${snap_local_path}" +else + echo "No snap installation option found" + exit 1 +fi diff --git a/pkg/cloudinit/worker_join_test.go b/pkg/cloudinit/worker_join_test.go index f9d85be6..d9a55384 100644 --- a/pkg/cloudinit/worker_join_test.go +++ b/pkg/cloudinit/worker_join_test.go @@ -1,9 +1,11 @@ package cloudinit_test import ( + "fmt" "testing" . "github.com/onsi/gomega" + "github.com/onsi/gomega/gstruct" "github.com/canonical/cluster-api-k8s/pkg/cloudinit" ) @@ -13,10 +15,13 @@ func TestNewJoinWorker(t *testing.T) { config, err := cloudinit.NewJoinWorker(cloudinit.JoinWorkerInput{ BaseUserData: cloudinit.BaseUserData{ - KubernetesVersion: "v1.30.0", - BootCommands: []string{"bootcmd"}, - PreRunCommands: []string{"prerun1", "prerun2"}, - PostRunCommands: []string{"postrun1", "postrun2"}, + KubernetesVersion: "v1.30.0", + BootCommands: []string{"bootcmd"}, + PreRunCommands: []string{"prerun1", "prerun2"}, + PostRunCommands: []string{"postrun1", "postrun2"}, + SnapstoreProxyScheme: "http", + SnapstoreProxyDomain: "snapstore.io", + SnapstoreProxyID: "abcd-1234-xyz", ExtraFiles: []cloudinit.File{{ Path: "/tmp/file", Content: "test file", @@ -38,6 +43,7 @@ func TestNewJoinWorker(t *testing.T) { // Verify the run commands. g.Expect(config.RunCommands).To(Equal([]string{ "set -x", + "/capi/scripts/configure-snapstore-proxy.sh", "prerun1", "prerun2", "/capi/scripts/install.sh", @@ -60,12 +66,16 @@ func TestNewJoinWorker(t *testing.T) { HaveField("Path", "/capi/scripts/configure-auth-token.sh"), HaveField("Path", "/capi/scripts/configure-node-token.sh"), HaveField("Path", "/capi/scripts/create-sentinel-bootstrap.sh"), + HaveField("Path", "/capi/scripts/configure-snapstore-proxy.sh"), HaveField("Path", "/capi/etc/config.yaml"), HaveField("Path", "/capi/etc/microcluster-address"), HaveField("Path", "/capi/etc/node-name"), HaveField("Path", "/capi/etc/node-token"), HaveField("Path", "/capi/etc/join-token"), - HaveField("Path", "/capi/etc/snap-track"), + HaveField("Path", "/capi/etc/snap-channel"), + HaveField("Path", "/capi/etc/snapstore-proxy-scheme"), + HaveField("Path", "/capi/etc/snapstore-proxy-domain"), + HaveField("Path", "/capi/etc/snapstore-proxy-id"), HaveField("Path", "/tmp/file"), ), "Some /capi/scripts files are missing") } @@ -105,3 +115,74 @@ func TestNewJoinWorkerAirGapped(t *testing.T) { // Verify the run commands is missing install.sh script. g.Expect(config.RunCommands).NotTo(ContainElement("/capi/scripts/install.sh")) } + +func TestNewJoinWorkerSnapInstall(t *testing.T) { + t.Run("DefaultSnapInstall", func(t *testing.T) { + g := NewWithT(t) + + config, err := cloudinit.NewJoinWorker(cloudinit.JoinWorkerInput{ + BaseUserData: cloudinit.BaseUserData{ + KubernetesVersion: "v1.30.0", + BootCommands: []string{"bootcmd"}, + PreRunCommands: []string{"prerun1", "prerun2"}, + PostRunCommands: []string{"postrun1", "postrun2"}, + }}) + + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(config.WriteFiles).To(ContainElement(gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{ + "Path": Equal(fmt.Sprintf("/capi/etc/snap-%s", cloudinit.InstallOptionChannel)), + "Content": Equal("1.30-classic/stable"), + }))) + g.Expect(config.WriteFiles).ToNot(ContainElement(HaveField("Path", fmt.Sprintf("/capi/etc/snap-%s", cloudinit.InstallOptionRevision)))) + g.Expect(config.WriteFiles).ToNot(ContainElement(HaveField("Path", fmt.Sprintf("/capi/etc/snap-%s", cloudinit.InstallOptionLocalPath)))) + }) + + tests := []struct { + name string + snapInstall cloudinit.SnapInstallData + }{ + { + name: "ChannelOverride", + snapInstall: cloudinit.SnapInstallData{ + Option: cloudinit.InstallOptionChannel, + Value: "v1.30/stable", + }, + }, + { + name: "RevisionOverride", + snapInstall: cloudinit.SnapInstallData{ + Option: cloudinit.InstallOptionRevision, + Value: "123", + }, + }, + { + name: "LocalPathOverride", + snapInstall: cloudinit.SnapInstallData{ + Option: cloudinit.InstallOptionLocalPath, + Value: "/path/to/k8s.snap", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + config, err := cloudinit.NewJoinWorker(cloudinit.JoinWorkerInput{ + BaseUserData: cloudinit.BaseUserData{ + KubernetesVersion: "v1.30.0", + SnapInstallData: tt.snapInstall, + BootCommands: []string{"bootcmd"}, + PreRunCommands: []string{"prerun1", "prerun2"}, + PostRunCommands: []string{"postrun1", "postrun2"}, + }}) + + g.Expect(err).NotTo(HaveOccurred()) + + g.Expect(config.WriteFiles).To(ContainElement(gstruct.MatchFields(gstruct.IgnoreExtras, gstruct.Fields{ + "Path": Equal(fmt.Sprintf("/capi/etc/snap-%s", tt.snapInstall.Option)), + "Content": Equal(tt.snapInstall.Value), + }))) + }) + } +} diff --git a/pkg/locking/control_plane_init_mutex.go b/pkg/locking/control_plane_init_mutex.go index e5a1e24f..7bc607ca 100644 --- a/pkg/locking/control_plane_init_mutex.go +++ b/pkg/locking/control_plane_init_mutex.go @@ -22,7 +22,6 @@ import ( "encoding/json" "fmt" - "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -156,7 +155,7 @@ func configMapName(clusterName string) string { func (s semaphore) information() (*information, error) { li := &information{} if err := json.Unmarshal([]byte(s.Data[semaphoreInformationKey]), li); err != nil { - return nil, errors.Wrap(err, "failed to unmarshal semaphore information") + return nil, fmt.Errorf("failed to unmarshal semaphore information: %w", err) } return li, nil } @@ -164,7 +163,7 @@ func (s semaphore) information() (*information, error) { func (s semaphore) setInformation(information *information) error { b, err := json.Marshal(information) if err != nil { - return errors.Wrap(err, "failed to marshal semaphore information") + return fmt.Errorf("failed to marshal semaphore information: %w", err) } s.Data = map[string]string{} s.Data[semaphoreInformationKey] = string(b) diff --git a/pkg/proxy/dial.go b/pkg/proxy/dial.go index d3d80729..3d78b877 100644 --- a/pkg/proxy/dial.go +++ b/pkg/proxy/dial.go @@ -18,12 +18,12 @@ package proxy import ( "context" + "errors" "fmt" "net" "net/http" "time" - "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/kubernetes" @@ -99,7 +99,7 @@ func (d *Dialer) DialContext(_ context.Context, _ string, addr string) (net.Conn // Warning: Any early return should close this connection, otherwise we're going to leak them. connection, _, err := dialer.Dial(portforward.PortForwardProtocolV1Name) if err != nil { - return nil, errors.Wrap(err, "error upgrading connection") + return nil, fmt.Errorf("error upgrading connection: %w", err) } // Create the headers. @@ -136,7 +136,7 @@ func (d *Dialer) DialContext(_ context.Context, _ string, addr string) (net.Conn dataStream, err := connection.CreateStream(headers) if err != nil { return nil, kerrors.NewAggregate([]error{ - errors.Wrap(err, "error creating forwarding stream"), + fmt.Errorf("error creating forwarding stream: %w", err), connection.Close(), }) } diff --git a/pkg/trace/trace.go b/pkg/trace/trace.go new file mode 100644 index 00000000..2f8457d4 --- /dev/null +++ b/pkg/trace/trace.go @@ -0,0 +1,19 @@ +package trace + +import ( + "crypto/rand" + "math/big" +) + +const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + +// NewID generates a random string of length 8. +// This string consists of numbers as well as uppercase and lowercase letters. +func NewID() string { + b := make([]byte, 8) + for i := range b { + num, _ := rand.Int(rand.Reader, big.NewInt(int64(len(charset)))) + b[i] = charset[num.Int64()] + } + return string(b) +} diff --git a/test/e2e/helpers.go b/test/e2e/helpers.go index f3724784..42ad619a 100644 --- a/test/e2e/helpers.go +++ b/test/e2e/helpers.go @@ -26,7 +26,6 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "github.com/pkg/errors" "golang.org/x/mod/semver" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -419,7 +418,7 @@ func WaitForControlPlaneToBeReady(ctx context.Context, input WaitForControlPlane } Byf("Getting the control plane %s", klog.KObj(input.ControlPlane)) if err := input.Getter.Get(ctx, key, controlplane); err != nil { - return false, errors.Wrapf(err, "failed to get KCP") + return false, fmt.Errorf("failed to get KCP: %w", err) } desiredReplicas := controlplane.Spec.Replicas