Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The kuberenetes secrets provider should regenerate the agent configuration when a secrets change is detected #4168

Closed
cmacknz opened this issue Jan 31, 2024 · 12 comments · Fixed by #4371
Assignees
Labels
Team:Cloudnative-Monitoring Label for the Cloud Native Monitoring team Team:Elastic-Agent Label for the Agent team

Comments

@cmacknz
Copy link
Member

cmacknz commented Jan 31, 2024

The Kubernetes secrets provider today implements the Fetch context provider interface, which provides the secret value whenever the agent configuration is being regenerated for another reason.

// FetchContextProvider is the interface that a context provider uses allow variable values to be determined when the
// configuration is rendered versus it being known in advanced.
type FetchContextProvider interface {
ContextProvider
// Fetch tries to fetch a value for a variable.
Fetch(string) (string, bool)
}

In cases where secrets are short lived and rotated frequently, this can result in the agent policy using stale secrets values. In general there is currently always a risk that a secret remains stale if the agent configuration is not regenerated because of another detected change in the system, for example a policy change.

The Kuberenetes secrets provider should instead implement the Comm (or communicating) context provider interface, which informs the agent when a change is detected so the running configuration can be regenerated.

// ContextProviderComm is the interface that a context provider uses to communicate back to Elastic Agent.
type ContextProviderComm interface {
context.Context
// Set sets the current mapping for this context.
Set(map[string]interface{}) error
}

For an example, this is the way the Kubernetes leader election provider is implemented:

// Run runs the leaderelection provider.
func (p *contextProvider) Run(ctx context.Context, comm corecomp.ContextProviderComm) error {

func (p *contextProvider) startLeading(comm corecomp.ContextProviderComm) {
mapping := map[string]interface{}{
"leader": true,
}
err := comm.Set(mapping)
if err != nil {
p.logger.Errorf("Failed updating leaderelection status to leader TRUE: %s", err)
}
}

Since the secrets provider now implements an optional cache, whenever the cache refreshes we could detect if a value has changed and notify the agent.

// Update the secrets in the cache every RefreshInterval
func (p *contextProviderK8sSecrets) updateSecrets(ctx context.Context) {
timer := time.NewTimer(p.config.RefreshInterval)
for {
select {
case <-ctx.Done():
return
case <-timer.C:
p.updateCache()
timer.Reset(p.config.RefreshInterval)
}
}
}

Ideally we would use a watcher on the secret values, but this has been found to have performance and memory usage concerns.

@cmacknz cmacknz added Team:Elastic-Agent Label for the Agent team Team:Cloudnative-Monitoring Label for the Cloud Native Monitoring team labels Jan 31, 2024
@elasticmachine
Copy link
Contributor

Pinging @elastic/elastic-agent (Team:Elastic-Agent)

@constanca-m
Copy link
Contributor

Hey @cmacknz , is this issue still ongoing? Is this still a priority?

@adamkasztenny
Copy link

@constanca-m @cmacknz We're still running into this issue

@constanca-m
Copy link
Contributor

The ContextProvider has

// Set sets the current mapping for this context.
Set(map[string]interface{}) error

But I am having trouble understanding what this mapping is. Would this be our secret cache? I tried to check the other mappings using the same function but they did not make it any clearer. Is there a way to see the output of the context, like using elastic-agent inspect...?

@cmacknz
Copy link
Member Author

cmacknz commented Mar 6, 2024

You would want one mapping for each secret the provider is populating in the policy.

The leader election provider seems like the simplest example:

mapping := map[string]interface{}{
"leader": true,
}

This is dynamically setting kubernetes_leaderelection.leader: true in the agent policy, which causes conditions like condition: ${kubernetes_leaderelection.leader} == true to be re-evaluated if it changed.

@constanca-m
Copy link
Contributor

Thank you @cmacknz . I tried to do it like this:

if updatedCache := p.updateCache(); updatedCache {
  mapping := map[string]interface{}{}
  p.secretsCacheMx.RLock()
  for key, data := range p.secretsCache {
	  mapping[key] = data.value
  }
  p.secretsCacheMx.RUnlock()
  err := comm.Set(mapping)
...

Which all it does is add an entry with the key secret and key value (just the string, not the full value with the last accessed timestamp), but this ends up breaking something in the agent, and when I look at discover nothing is there. I have a warning in the logs that was not there in the current version I tested, but it does not help to track what is wrong in the mapping:

{"log.level":"warn","@timestamp":"2024-03-06T15:34:30.139Z","message":"Cannot index event (status=400): dropping event! Enable debug logs to view the event and cause.","component":{"binary":"filebeat","dataset":"elastic_agent.filebeat","id":"filestream-monitoring","type":"filestream"},"log":{"source":"filestream-monitoring"},"log.logger":"elasticsearch","log.origin":{"file.line":450,"file.name":"elasticsearch/client.go","function":"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch.(*Client).bulkCollectPublishFails"},"service.name":"filebeat","ecs.version":"1.6.0","ecs.version":"1.6.0"}

@cmacknz
Copy link
Member Author

cmacknz commented Mar 6, 2024

You'll need to turn on debug logging to see what is wrong. The event can't be accepted by Elasticsearch for some reason (mapping conflict is the most common reason).

@constanca-m
Copy link
Contributor

constanca-m commented Mar 6, 2024

Ok, I will do that. I have another question though.

When we use the FetchContextProvider we have access to all kubernetes secrets variables as ${kubernetes_secrets. ...} with the Fetch function, but when we use ContextProviderComm we no longer have that. We could follow the same logic, and try to implement all functions (I don't if it works), but is there any easier way to have access to all these variables at the start of the Run function?

They are not part of the config.

@cmacknz
Copy link
Member Author

cmacknz commented Mar 6, 2024

I think that is likely part of the reason why the kubernetessecrets provider only implements a fetch context provider, it doesn't have an easy way to know the secrets in advance. The secrets provider is the only provider implementing Fetch.

I think what you probably want is to implement both Fetch and Set, this might turn into a larger change. You can get the list of secrets to watch from the initial calls to Fetch when the secrets are first introduced into the policy.

Then as the cache is refreshed you can call Set if you detect that the current value changed from the previous value.

@blakerouse might have a better way to deal with this, he's more familiar with the provider implementations than I am.

@blakerouse
Copy link
Contributor

I think a change needs to be done the interface to support this. This should work:

diff --git a/internal/pkg/composable/controller.go b/internal/pkg/composable/controller.go
index 02ad5186be..32830a479e 100644
--- a/internal/pkg/composable/controller.go
+++ b/internal/pkg/composable/controller.go
@@ -300,6 +300,21 @@ type contextProviderState struct {
        signal   chan bool
 }
 
+// Signal signals that something has changed in the provider.
+//
+// Note: This should only be used by fetch context providers, standard context
+// providers should use Set to update the overall state.
+func (c *contextProviderState) Signal() {
+       // Notify the controller Run loop that a state has changed. The notification
+       // channel has buffer size 1 so this ensures that an update will always
+       // happen after this change, while coalescing multiple simultaneous changes
+       // into a single controller update.
+       select {
+       case c.signal <- true:
+       default:
+       }
+}
+
 // Set sets the current mapping.
 func (c *contextProviderState) Set(mapping map[string]interface{}) error {
        var err error
@@ -321,15 +336,7 @@ func (c *contextProviderState) Set(mapping map[string]interface{}) error {
                return nil
        }
        c.mapping = mapping
-
-       // Notify the controller Run loop that a state has changed. The notification
-       // channel has buffer size 1 so this ensures that an update will always
-       // happen after this change, while coalescing multiple simultaneous changes
-       // into a single controller update.
-       select {
-       case c.signal <- true:
-       default:
-       }
+       c.Signal()
        return nil
 }
 
diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go
index 4bcf90470b..2481520934 100644
--- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go
+++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go
@@ -35,6 +35,9 @@ type contextProviderK8sSecrets struct {
        clientMx sync.Mutex
        client   k8sclient.Interface
 
+       commMx sync.Mutex
+       comm   corecomp.ContextProviderComm
+
        secretsCacheMx sync.RWMutex
        secretsCache   map[string]*secretsData
 }
@@ -84,6 +87,9 @@ func (p *contextProviderK8sSecrets) Run(ctx context.Context, comm corecomp.Conte
        p.clientMx.Lock()
        p.client = client
        p.clientMx.Unlock()
+       p.commMx.Lock()
+       p.comm = comm
+       p.commMx.Unlock()
 
        if !p.config.DisableCache {
                go p.updateSecrets(ctx)
@@ -94,6 +100,9 @@ func (p *contextProviderK8sSecrets) Run(ctx context.Context, comm corecomp.Conte
        p.clientMx.Lock()
        p.client = nil
        p.clientMx.Unlock()
+       p.commMx.Lock()
+       p.comm = nil
+       p.commMx.Unlock()
        return comm.Err()
 }
 
@@ -110,6 +119,8 @@ func (p *contextProviderK8sSecrets) updateSecrets(ctx context.Context) {
                        return
                case <-timer.C:
                        p.updateCache()
+                       // TODO: Change to only call signal in the case that something has changed.
+                       p.signal()
                        timer.Reset(p.config.RefreshInterval)
                }
        }
@@ -139,6 +150,12 @@ func (p *contextProviderK8sSecrets) mergeWithCurrent(updatedMap map[string]*secr
        return merged
 }
 
+func (p *contextProviderK8sSecrets) signal() {
+       p.commMx.Lock()
+       defer p.commMx.Unlock()
+       p.comm.Signal()
+}
+
 func (p *contextProviderK8sSecrets) updateCache() {
        // deleting entries does not free the memory, so we need to create a new map
        // to place the secrets we want to keep
diff --git a/internal/pkg/core/composable/providers.go b/internal/pkg/core/composable/providers.go
index 1c4018fd47..b7aee80b69 100644
--- a/internal/pkg/core/composable/providers.go
+++ b/internal/pkg/core/composable/providers.go
@@ -19,6 +19,12 @@ type FetchContextProvider interface {
 type ContextProviderComm interface {
        context.Context
 
+       // Signal signals that something has changed in the provider.
+       //
+       // Note: This should only be used by fetch context providers, standard context
+       // providers should use Set to update the overall state.
+       Signal()
+
        // Set sets the current mapping for this context.
        Set(map[string]interface{}) error
 }

That should trigger an update when something changes, without having to call Set which this provider should not do.

@blakerouse
Copy link
Contributor

I went ahead and created a PR #4368, to add the Signal method. That will allow the kubernetes secrets provider to just call comm.Signal() when its ready.

@constanca-m
Copy link
Contributor

Thank you @blakerouse for such quick action. I will have a look and test if it works, but at first look, it seems to be exactly what is needed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Team:Cloudnative-Monitoring Label for the Cloud Native Monitoring team Team:Elastic-Agent Label for the Agent team
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants