Skip to content

Commit 5326332

Browse files
[Kubernetes secret provider] Send signal to agent if cache is updated (#4371)
* Send signal if cache is updated * Signal tests * Add changelog fragment and fix typos * Update changelog/fragments/1709824109-k8s-secret-provider-trigger-signal.yaml Co-authored-by: Blake Rouse <blake.rouse@elastic.co> * Fix racing condition --------- Co-authored-by: Blake Rouse <blake.rouse@elastic.co>
1 parent 92437db commit 5326332

File tree

3 files changed

+391
-9
lines changed

3 files changed

+391
-9
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: feature
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Kubernetes secrets provider has been improved to update a kubernetes secret when the secret value changes.
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
#description:
20+
21+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
22+
component: elastic-agent
23+
24+
# PR URL; optional; the PR number that added the changeset.
25+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
26+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
27+
# Please provide it if you are adding a fragment for a different PR.
28+
pr: https://github.com/elastic/elastic-agent/pull/4371
29+
30+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
31+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
32+
issue: https://github.com/elastic/elastic-agent/issues/4168

internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go

+35-9
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func (p *contextProviderK8sSecrets) Run(ctx context.Context, comm corecomp.Conte
8686
p.clientMx.Unlock()
8787

8888
if !p.config.DisableCache {
89-
go p.updateSecrets(ctx)
89+
go p.updateSecrets(ctx, comm)
9090
}
9191

9292
<-comm.Done()
@@ -102,28 +102,39 @@ func getK8sClient(kubeconfig string, opt kubernetes.KubeClientOptions) (k8sclien
102102
}
103103

104104
// Update the secrets in the cache every RefreshInterval
105-
func (p *contextProviderK8sSecrets) updateSecrets(ctx context.Context) {
105+
func (p *contextProviderK8sSecrets) updateSecrets(ctx context.Context, comm corecomp.ContextProviderComm) {
106106
timer := time.NewTimer(p.config.RefreshInterval)
107107
for {
108108
select {
109109
case <-ctx.Done():
110110
return
111111
case <-timer.C:
112-
p.updateCache()
112+
updatedCache := p.updateCache()
113+
if updatedCache {
114+
p.logger.Info("Secrets cache was updated, the agent will be notified.")
115+
comm.Signal()
116+
}
113117
timer.Reset(p.config.RefreshInterval)
114118
}
115119
}
116120
}
117121

118122
// mergeWithCurrent merges the updated map with the cache map.
119123
// This function needs to be called between the mutex lock for the map.
120-
func (p *contextProviderK8sSecrets) mergeWithCurrent(updatedMap map[string]*secretsData) map[string]*secretsData {
124+
func (p *contextProviderK8sSecrets) mergeWithCurrent(updatedMap map[string]*secretsData) (map[string]*secretsData, bool) {
121125
merged := make(map[string]*secretsData)
126+
updatedCache := false
122127

123128
for name, data := range p.secretsCache {
124129
diff := time.Since(data.lastAccess)
125130
if diff < p.config.TTLDelete {
126131
merged[name] = data
132+
// Check if this key is part of the updatedMap. If it is not, we know the secrets cache was updated,
133+
// and we need to signal that.
134+
_, ok := updatedMap[name]
135+
if !ok {
136+
updatedCache = true
137+
}
127138
}
128139
}
129140

@@ -132,14 +143,20 @@ func (p *contextProviderK8sSecrets) mergeWithCurrent(updatedMap map[string]*secr
132143
// it could have been updated when trying to fetch the secret at the same time we are running update cache.
133144
// In that case, we only update the value.
134145
if _, ok := merged[name]; ok {
135-
merged[name].value = data.value
146+
if merged[name].value != data.value {
147+
merged[name].value = data.value
148+
updatedCache = true
149+
}
136150
}
137151
}
138152

139-
return merged
153+
return merged, updatedCache
140154
}
141155

142-
func (p *contextProviderK8sSecrets) updateCache() {
156+
func (p *contextProviderK8sSecrets) updateCache() bool {
157+
// Keep track whether the cache had values changing, so we can notify the agent
158+
updatedCache := false
159+
143160
// deleting entries does not free the memory, so we need to create a new map
144161
// to place the secrets we want to keep
145162
cacheTmp := make(map[string]*secretsData)
@@ -152,6 +169,8 @@ func (p *contextProviderK8sSecrets) updateCache() {
152169
}
153170
p.secretsCacheMx.RUnlock()
154171

172+
// The only way to update an entry in the cache is through the last access time (to delete the key)
173+
// or if the value gets updated.
155174
for name, data := range copyMap {
156175
diff := time.Since(data.lastAccess)
157176
if diff < p.config.TTLDelete {
@@ -162,17 +181,24 @@ func (p *contextProviderK8sSecrets) updateCache() {
162181
lastAccess: data.lastAccess,
163182
}
164183
cacheTmp[name] = newData
184+
if value != data.value {
185+
updatedCache = true
186+
}
165187
}
166-
188+
} else {
189+
updatedCache = true
167190
}
168191
}
169192

170193
// While the cache was updated, it is possible that some secret was added through another go routine.
171194
// We need to merge the updated map with the current cache map to catch the new entries and avoid
172195
// loss of data.
196+
var updated bool
173197
p.secretsCacheMx.Lock()
174-
p.secretsCache = p.mergeWithCurrent(cacheTmp)
198+
p.secretsCache, updated = p.mergeWithCurrent(cacheTmp)
175199
p.secretsCacheMx.Unlock()
200+
201+
return updatedCache || updated
176202
}
177203

178204
func (p *contextProviderK8sSecrets) getFromCache(key string) (string, bool) {

0 commit comments

Comments
 (0)