-
Notifications
You must be signed in to change notification settings - Fork 156
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Kubernetes leader election] Run leader elector at all times #4542
Changes from all commits
74f9da7
837afcc
e56457f
f2399de
398fce6
efa6d27
418026d
d5d814d
20b14e2
2b00475
6efdd1c
ab26600
dafa82c
c4f8046
3e34595
f143a35
b3cf472
90d435f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
# Kind can be one of: | ||
# - breaking-change: a change to previously-documented behavior | ||
# - deprecation: functionality that is being removed in a later release | ||
# - bug-fix: fixes a problem in a previous version | ||
# - enhancement: extends functionality but does not break or fix existing behavior | ||
# - feature: new functionality | ||
# - known-issue: problems that we are aware of in a given version | ||
# - security: impacts on the security of a product or a user’s deployment. | ||
# - upgrade: important information for someone upgrading from a prior version | ||
# - other: does not fit into any of the other categories | ||
kind: bug-fix | ||
|
||
# Change summary; a 80ish characters long description of the change. | ||
summary: Fix issue where kubernetes_leaderelection provider would not try to reacquire the lease once lost | ||
|
||
# Long description; in case the summary is not enough to describe the change | ||
# this field accommodate a description without length limits. | ||
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. | ||
#description: | ||
|
||
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. | ||
component: elastic-agent | ||
|
||
# PR URL; optional; the PR number that added the changeset. | ||
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. | ||
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. | ||
# Please provide it if you are adding a fragment for a different PR. | ||
pr: https://github.com/elastic/elastic-agent/pull/4542 | ||
|
||
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). | ||
# If not present is automatically filled by the tooling with the issue linked to the PR number. | ||
issue: https://github.com/elastic/elastic-agent/issues/4543 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,229 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package kubernetesleaderelection | ||
|
||
import ( | ||
"context" | ||
"os" | ||
"testing" | ||
"time" | ||
|
||
autodiscoverK8s "github.com/elastic/elastic-agent-autodiscover/kubernetes" | ||
|
||
"github.com/stretchr/testify/require" | ||
v1 "k8s.io/api/coordination/v1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/client-go/kubernetes" | ||
k8sfake "k8s.io/client-go/kubernetes/fake" | ||
|
||
"github.com/elastic/elastic-agent-libs/logp" | ||
|
||
ctesting "github.com/elastic/elastic-agent/internal/pkg/composable/testing" | ||
"github.com/elastic/elastic-agent/internal/pkg/config" | ||
) | ||
|
||
const namespace = "default" | ||
const leaseName = "agent-lease-test" | ||
|
||
// createLease creates a new lease resource | ||
func createLease() *v1.Lease { | ||
lease := &v1.Lease{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: leaseName, | ||
Namespace: namespace, | ||
}, | ||
} | ||
return lease | ||
} | ||
|
||
// applyLease applies the lease | ||
func applyLease(client kubernetes.Interface, lease *v1.Lease, firstTime bool) error { | ||
var err error | ||
if firstTime { | ||
_, err = client.CoordinationV1().Leases(namespace).Create(context.Background(), lease, metav1.CreateOptions{}) | ||
return err | ||
} | ||
_, err = client.CoordinationV1().Leases(namespace).Update(context.Background(), lease, metav1.UpdateOptions{}) | ||
return err | ||
} | ||
|
||
// getLeaseHolder returns the holder identity of the lease | ||
func getLeaseHolder(client kubernetes.Interface) (string, error) { | ||
lease, err := client.CoordinationV1().Leases(namespace).Get(context.Background(), leaseName, metav1.GetOptions{}) | ||
if err != nil { | ||
return "", err | ||
} | ||
holder := lease.Spec.HolderIdentity | ||
if holder == nil { | ||
return "", err | ||
} | ||
return *holder, nil | ||
} | ||
|
||
// TestNewLeaderElectionManager will test the leader elector. | ||
// We will try to check if an instance can acquire the lease more than one time. This way, we will know that | ||
// the leader elector starts running again after it has stopped - which happens once a leader loses the lease. | ||
// To make sure that happens we will do the following: | ||
// 1. We will create the lease to be used by the leader elector. | ||
// 2. We will create two context providers - in the default context, this would mean two nodes, each one with an agent running. | ||
// We will wait for one of the agents, agent1, to acquire the lease, before starting the other. | ||
// 3. We force the lease to be acquired by the other agent, agent2. | ||
// 4. We will force the lease to be acquired by the agent1 again. To avoid the agent2 reacquiring it multiple times, | ||
// we will stop this provider and make sure the agent1 can reacquire it. | ||
constanca-m marked this conversation as resolved.
Show resolved
Hide resolved
|
||
func TestNewLeaderElectionManager(t *testing.T) { | ||
client := k8sfake.NewSimpleClientset() | ||
|
||
lease := createLease() | ||
// create the lease that leader election will be using | ||
err := applyLease(client, lease, true) | ||
require.NoError(t, err) | ||
|
||
// Create the provider | ||
logger := logp.NewLogger("test_leaderelection") | ||
|
||
leaseDuration := 3 | ||
leaseRenewDeadline := 2 | ||
leaseRetryPeriod := 1 | ||
|
||
c := map[string]interface{}{ | ||
"leader_lease": leaseName, | ||
"leader_leaseduration": leaseDuration, | ||
"leader_renewdeadline": leaseRenewDeadline, | ||
"leader_retryperiod": leaseRetryPeriod, | ||
} | ||
cfg, err := config.NewConfigFrom(c) | ||
require.NoError(t, err) | ||
|
||
getK8sClientFunc = func(kubeconfig string, opt autodiscoverK8s.KubeClientOptions) (kubernetes.Interface, error) { | ||
return client, nil | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What would be nice would be setting the
|
||
require.NoError(t, err) | ||
|
||
podNames := [2]string{"agent1", "agent2"} | ||
cancelFuncs := [2]context.CancelFunc{} | ||
|
||
done := make(chan int, 1) | ||
|
||
// Create two leader election providers representing two agents running | ||
for i := 0; i < 2; i++ { | ||
p, err := ContextProviderBuilder(logger, cfg, true) | ||
require.NoError(t, err) | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
cancelFuncs[i] = cancel | ||
constanca-m marked this conversation as resolved.
Show resolved
Hide resolved
|
||
defer cancel() | ||
|
||
comm := ctesting.NewContextComm(ctx) | ||
|
||
err = os.Setenv("POD_NAME", podNames[i]) | ||
if err != nil { | ||
require.FailNow(t, "Failed to set pod name environment variable.") | ||
} | ||
go func() { | ||
_ = p.Run(ctx, comm) | ||
}() | ||
|
||
if i == 1 { | ||
break | ||
} | ||
|
||
// We need to wait for the first agent to acquire the lease, so we can POD_NAME environment variable again | ||
go func() { | ||
expectedLeader := leaderElectorPrefix + podNames[i] | ||
for { | ||
holder, err := getLeaseHolder(client) | ||
require.NoError(t, err) | ||
|
||
if holder == expectedLeader { | ||
done <- 1 | ||
break | ||
} | ||
} | ||
}() | ||
|
||
select { | ||
case <-done: | ||
case <-time.After(time.Duration(leaseDuration+leaseRetryPeriod) * 30 * time.Second): | ||
require.FailNow(t, "Timeout"+ | ||
" while waiting for the first pod to acquire the lease. This should not happen. Consider increasing "+ | ||
"the timeout.") | ||
} | ||
} | ||
|
||
go func() { | ||
// At this point the current holder is agent1. Let's change it to agent2. | ||
for { | ||
// Force the lease to be applied again, so a new leader is elected. | ||
intermediateHolder := "does-not-matter" | ||
lease.Spec.HolderIdentity = &intermediateHolder | ||
err = applyLease(client, lease, false) | ||
require.NoError(t, err) | ||
|
||
var currentHolder string | ||
for { | ||
currentHolder, err = getLeaseHolder(client) | ||
require.NoError(t, err) | ||
|
||
// In this case, we already have an agent as holder | ||
if currentHolder == leaderElectorPrefix+podNames[0] || currentHolder == leaderElectorPrefix+podNames[1] { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not adding a timeout to this for cycle because this is a sure thing to happen, since there are only 2 pods running to acquire the lease. |
||
break | ||
} | ||
} | ||
|
||
if currentHolder == leaderElectorPrefix+podNames[1] { | ||
done <- 1 | ||
break | ||
} | ||
} | ||
}() | ||
|
||
select { | ||
case <-done: | ||
case <-time.After(time.Duration(leaseDuration+leaseRetryPeriod) * 30 * time.Second): | ||
require.FailNow(t, "Timeout "+ | ||
" while waiting for agent2 to acquire the lease. This should not happen. Consider increasing "+ | ||
"the timeout.") | ||
} | ||
constanca-m marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Now that the holder is agent2, let's wait for agent1 to be reelected. | ||
// To avoid having to wait very long, the context of agent2 will be canceled so the leader elector will not be | ||
// running anymore. This way there is only one instance fighting to acquire the lease. | ||
cancelFuncs[1]() | ||
constanca-m marked this conversation as resolved.
Show resolved
Hide resolved
|
||
go func() { | ||
for { | ||
// Force the lease to be applied again, so a new leader is elected. | ||
intermediateHolder := "does-not-matter" | ||
lease.Spec.HolderIdentity = &intermediateHolder | ||
err = applyLease(client, lease, false) | ||
require.NoError(t, err) | ||
|
||
var currentHolder string | ||
for { | ||
currentHolder, err = getLeaseHolder(client) | ||
require.NoError(t, err) | ||
|
||
// In this case, we already have an agent as holder | ||
if currentHolder == leaderElectorPrefix+podNames[0] || currentHolder == leaderElectorPrefix+podNames[1] { | ||
break | ||
} | ||
} | ||
|
||
if currentHolder == leaderElectorPrefix+podNames[0] { | ||
done <- 1 | ||
break | ||
} | ||
} | ||
}() | ||
|
||
select { | ||
case <-done: | ||
case <-time.After(time.Duration(leaseDuration+leaseRetryPeriod) * 30 * time.Second): | ||
require.FailNow(t, "Timeout"+ | ||
" while waiting for agent1 to reacquire the lease. This should not happen. Consider increasing "+ | ||
"the timeout.") | ||
} | ||
constanca-m marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
cancelFuncs[0]() | ||
constanca-m marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a cluster error resulting in the leader continuously losing the lease, will this result in attempts to acquire it as quickly as possible with no rate limit?
The implementation of
Run
I see is:There have been several escalations showing that failing to appropriately rate limit k8s control plane API calls like leader election can destabilize clusters.
What testing have we done to ensure this change won't cause issues like this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have already been doing this. This change will not affect the number of calls, it just makes sure that at least 1 agent will be reporting metrics. The problem with the implementation now is that run goes like this:
All the while, all the other instances were trying to acquire the lease already.
We do not have many SDHs on this bug, which leads me to believe that it is rare for an agent to lose the lease. But we do have problems when agents stop reporting metrics, and the only way we knew how to restart that was to make the pod run again - that is, force
run()
to run again.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some parameters in the config regarding the lease:
elastic-agent/internal/pkg/composable/providers/kubernetesleaderelection/config.go
Lines 17 to 19 in 43cb148
If it becomes necessary to reduce the amount of times an agent tries to acquire it.