Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kubernetes leader election] Run leader elector at all times #4542

Merged
merged 18 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions changelog/fragments/1712583231-leader-election-issue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: Fix issue where kubernetes_leaderelection provider would not try to require the lease once lost

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: elastic-agent

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/4542

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/4543
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sclient "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"

"github.com/elastic/elastic-agent-autodiscover/kubernetes"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/composable"
Expand All @@ -22,6 +24,8 @@ import (
"github.com/elastic/elastic-agent/pkg/core/logger"
)

const leaderElectorPrefix = "elastic-agent-leader-"

func init() {
composable.Providers.MustAddContextProvider("kubernetes_leaderelection", ContextProviderBuilder)
}
Expand All @@ -45,11 +49,15 @@ func ContextProviderBuilder(logger *logger.Logger, c *config.Config, managed boo
return &contextProvider{logger, &cfg, nil}, nil
}

// This is needed to overwrite the Kubernetes client for the tests
var getK8sClientFunc = func(kubeconfig string, opt kubernetes.KubeClientOptions) (k8sclient.Interface, error) {
return kubernetes.GetKubernetesClient(kubeconfig, opt)
}

// Run runs the leaderelection provider.
func (p *contextProvider) Run(ctx context.Context, comm corecomp.ContextProviderComm) error {
client, err := kubernetes.GetKubernetesClient(p.config.KubeConfig, p.config.KubeClientOptions)
client, err := getK8sClientFunc(p.config.KubeConfig, p.config.KubeClientOptions)
if err != nil {
// info only; return nil (do nothing)
p.logger.Debugf("Kubernetes leaderelection provider skipped, unable to connect: %s", err)
return nil
}
Expand All @@ -61,9 +69,9 @@ func (p *contextProvider) Run(ctx context.Context, comm corecomp.ContextProvider
var id string
podName, found := os.LookupEnv("POD_NAME")
if found {
id = "elastic-agent-leader-" + podName
id = leaderElectorPrefix + podName
} else {
id = "elastic-agent-leader-" + agentInfo.AgentID()
id = leaderElectorPrefix + agentInfo.AgentID()
}

ns, err := kubernetes.InClusterNamespace()
Expand Down Expand Up @@ -104,9 +112,14 @@ func (p *contextProvider) Run(ctx context.Context, comm corecomp.ContextProvider
p.logger.Errorf("error while creating Leader Elector: %v", err)
}
p.logger.Debugf("Starting Leader Elector")
le.Run(comm)
p.logger.Debugf("Stopped Leader Elector")
return comm.Err()

for {
le.Run(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a cluster error resulting in the leader continuously losing the lease, will this result in attempts to acquire it as quickly as possible with no rate limit?

The implementation of Run I see is:

// Run starts the leader election loop. Run will not return
// before leader election loop is stopped by ctx or it has
// stopped holding the leader lease
func (le *LeaderElector) Run(ctx context.Context) {
	defer runtime.HandleCrash()
	defer func() {
		le.config.Callbacks.OnStoppedLeading()
	}()

	if !le.acquire(ctx) {
		return // ctx signalled done
	}
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	go le.config.Callbacks.OnStartedLeading(ctx)
	le.renew(ctx)
}

There have been several escalations showing that failing to appropriately rate limit k8s control plane API calls like leader election can destabilize clusters.

What testing have we done to ensure this change won't cause issues like this?

Copy link
Contributor Author

@constanca-m constanca-m Apr 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have already been doing this. This change will not affect the number of calls, it just makes sure that at least 1 agent will be reporting metrics. The problem with the implementation now is that run goes like this:

  • Keeps trying to acquire the lease
  • Acquire the lease
  • Loose the lease and stop running
    All the while, all the other instances were trying to acquire the lease already.

We do not have many SDHs on this bug, which leads me to believe that it is rare for an agent to lose the lease. But we do have problems when agents stop reporting metrics, and the only way we knew how to restart that was to make the pod run again - that is, force run() to run again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some parameters in the config regarding the lease:

LeaseDuration int `config:"leader_leaseduration"`
RenewDeadline int `config:"leader_renewdeadline"`
RetryPeriod int `config:"leader_retryperiod"`

If it becomes necessary to reduce the amount of times an agent tries to acquire it.

if ctx.Err() != nil {
p.logger.Debugf("Stopped Leader Elector")
return comm.Err()
}
}
}

func (p *contextProvider) startLeading(comm corecomp.ContextProviderComm) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package kubernetesleaderelection

import (
"context"
"os"
"testing"

autodiscoverK8s "github.com/elastic/elastic-agent-autodiscover/kubernetes"

"github.com/stretchr/testify/require"
v1 "k8s.io/api/coordination/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
k8sfake "k8s.io/client-go/kubernetes/fake"

"github.com/elastic/elastic-agent-libs/logp"

ctesting "github.com/elastic/elastic-agent/internal/pkg/composable/testing"
"github.com/elastic/elastic-agent/internal/pkg/config"
)

const namespace = "default"
const leaseName = "agent-lease-test"

// createLease creates a new lease resource
func createLease() *v1.Lease {
lease := &v1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: leaseName,
Namespace: namespace,
},
}
return lease
}

// applyLease applies the lease
func applyLease(client kubernetes.Interface, lease *v1.Lease, firstTime bool) error {
var err error
if firstTime {
_, err = client.CoordinationV1().Leases(namespace).Create(context.Background(), lease, metav1.CreateOptions{})
return err
}
_, err = client.CoordinationV1().Leases(namespace).Update(context.Background(), lease, metav1.UpdateOptions{})
return err
}

// getLeaseHolder returns the holder identity of the lease
func getLeaseHolder(client kubernetes.Interface) (string, error) {
lease, err := client.CoordinationV1().Leases(namespace).Get(context.Background(), leaseName, metav1.GetOptions{})
if err != nil {
return "", err
}
holder := lease.Spec.HolderIdentity
if holder == nil {
return "", err
}
return *holder, nil
}

// TestNewLeaderElectionManager will test the leader elector.
// We will try to check if an instance can acquire the lease more than one time. This way, we will know that
// the leader elector starts running again after it has stopped - which happens once a leader looses the lease.
// To make sure that happens we will do the following:
// 1. We will create the lease to be used by the leader elector.
// 2. We will create two context providers - in the default context, this would mean two nodes, each one with an agent running.
// We will wait for one of the agents, agent1, to acquire the lease, before starting the other.
// 3. We force the lease to be acquired by the other agent, agent2.
// 4. We will force the lease to be acquired by the agent1 again. To avoid the agent2 reacquiring it multiple times,
// we will stop this provider and make sure the agent1 can reacquire it.
func TestNewLeaderElectionManager(t *testing.T) {
client := k8sfake.NewSimpleClientset()

lease := createLease()
// create the lease that leader election will be using
err := applyLease(client, lease, true)
require.NoError(t, err)

// Create the provider
logger := logp.NewLogger("test_leaderelection")

leaseDuration := 3
leaseRenewDeadline := 2
leaseRetryPeriod := 1

c := map[string]interface{}{
"leader_lease": leaseName,
"leader_leaseduration": leaseDuration,
"leader_renewdeadline": leaseRenewDeadline,
"leader_retryperiod": leaseRetryPeriod,
}
cfg, err := config.NewConfigFrom(c)
require.NoError(t, err)

getK8sClientFunc = func(kubeconfig string, opt autodiscoverK8s.KubeClientOptions) (kubernetes.Interface, error) {
return client, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be nice would be setting the getK8sClientFunc back to the original after the test. Something like:

defer func() {
   getK8sClientFunc = defaultGetK8sClientFunc
}()

require.NoError(t, err)

podNames := [2]string{"agent1", "agent2"}
cancelFuncs := [2]context.CancelFunc{}

// Create two leader election providers representing two agents running
for i := 0; i < 2; i++ {
p, err := ContextProviderBuilder(logger, cfg, true)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
cancelFuncs[i] = cancel
comm := ctesting.NewContextComm(ctx)

err = os.Setenv("POD_NAME", podNames[i])
if err != nil {
require.FailNow(t, "Failed to set pod name environment variable.")
}
go func() {
_ = p.Run(ctx, comm)
}()

if i == 1 {
break
}

// We need to wait for the first agent to acquire the lease, so we can POD_NAME environment variable again
expectedLeader := leaderElectorPrefix + podNames[i]
for {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What holder == expectedLeader doesn't happens here if that never happens? Seems this will loop forever?

might be better to switch to require.Eventually with a timeout just to ensure and a break of the code somewhere doesn't have this block the unit test forever

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a timeout. I explained in this comment why I removed it before #4542 (comment).

holder, err := getLeaseHolder(client)
require.NoError(t, err)

if holder == expectedLeader {
break
}
}
}

// At this point the current holder is agent1. Let's change it to agent2.
for {
// Force the lease to be applied again, so a new leader is elected.
intermediateHolder := "does-not-matter"
lease.Spec.HolderIdentity = &intermediateHolder
err = applyLease(client, lease, false)
require.NoError(t, err)

var currentHolder string
for {
currentHolder, err = getLeaseHolder(client)
require.NoError(t, err)

// In this case, we already have an agent as holder
if currentHolder == leaderElectorPrefix+podNames[0] || currentHolder == leaderElectorPrefix+podNames[1] {
break
}
}

if currentHolder == leaderElectorPrefix+podNames[1] {
break
}
}

// Now that the holder is agent2, let's wait for agent1 to be reelected.
// To avoid having to wait very long, the context of agent2 will be canceled so the leader elector will not be
// running anymore. This way there is only one instance fighting to acquire the lease.
cancelFuncs[1]()
for {
// Force the lease to be applied again, so a new leader is elected.
intermediateHolder := "does-not-matter"
lease.Spec.HolderIdentity = &intermediateHolder
err = applyLease(client, lease, false)
require.NoError(t, err)

var currentHolder string
for {
currentHolder, err = getLeaseHolder(client)
require.NoError(t, err)

// In this case, we already have an agent as holder
if currentHolder == leaderElectorPrefix+podNames[0] || currentHolder == leaderElectorPrefix+podNames[1] {
break
}
}

if currentHolder == leaderElectorPrefix+podNames[0] {
break
}
}

cancelFuncs[0]()
}
Loading