@@ -61,6 +61,7 @@ type FetcherDefault struct {
61
61
watching []url.URL
62
62
63
63
lock sync.Mutex
64
+ wg sync.WaitGroup
64
65
}
65
66
66
67
func NewFetcherDefault (
@@ -119,9 +120,7 @@ func (f *FetcherDefault) configUpdate(ctx context.Context, watcher *fsnotify.Wat
119
120
}
120
121
}
121
122
for _ , source := range replace {
122
- go func (s url.URL ) {
123
- events <- event {et : eventFileChanged , path : s , source : "config_update" }
124
- }(source )
123
+ f .enqueueEvent (events , event {et : eventFileChanged , path : source , source : "config_update" })
125
124
}
126
125
return nil
127
126
}
@@ -162,8 +161,13 @@ func (f *FetcherDefault) Watch(ctx context.Context) error {
162
161
defer watcher .Close ()
163
162
164
163
events := make (chan event )
165
- defer close (events )
166
- return f .watch (ctx , watcher , events )
164
+ err = f .watch (ctx , watcher , events )
165
+
166
+ // Close the channel only when all child goroutines exit
167
+ f .wg .Wait ()
168
+ close (events )
169
+
170
+ return err
167
171
}
168
172
169
173
func (f * FetcherDefault ) watch (ctx context.Context , watcher * fsnotify.Watcher , events chan event ) error {
@@ -176,16 +180,12 @@ func (f *FetcherDefault) watch(ctx context.Context, watcher *fsnotify.Watcher, e
176
180
return nil
177
181
}
178
182
179
- go func () {
180
- events <- event {et : eventRepositoryConfigChange , source : "viper_watcher" }
181
- }()
183
+ f .enqueueEvent (events , event {et : eventRepositoryConfigChange , source : "viper_watcher" })
182
184
183
185
return nil
184
186
})
185
187
186
- go func () {
187
- events <- event {et : eventRepositoryConfigChange , source : "entrypoint" }
188
- }()
188
+ f .enqueueEvent (events , event {et : eventRepositoryConfigChange , source : "entrypoint" })
189
189
190
190
for {
191
191
select {
@@ -199,9 +199,7 @@ func (f *FetcherDefault) watch(ctx context.Context, watcher *fsnotify.Watcher, e
199
199
f .r .Logger ().
200
200
Debugf ("Detected that a access rule repository file has been removed, reloading config." )
201
201
// If a file was removed it's likely that the config changed as well - reload!
202
- go func () {
203
- events <- event {et : eventRepositoryConfigChange , source : "fsnotify_remove" }
204
- }()
202
+ f .enqueueEvent (events , event {et : eventRepositoryConfigChange , source : "fsnotify_remove" })
205
203
continue
206
204
}
207
205
@@ -216,9 +214,7 @@ func (f *FetcherDefault) watch(ctx context.Context, watcher *fsnotify.Watcher, e
216
214
WithField ("op" , e .Op .String ()).
217
215
Debugf ("Detected access rule repository file change." )
218
216
219
- go func () {
220
- events <- event {et : eventFileChanged , path : * source , source : "fsnotify_update" }
221
- }()
217
+ f .enqueueEvent (events , event {et : eventFileChanged , path : * source , source : "fsnotify_update" })
222
218
case e , ok := <- events :
223
219
if ! ok {
224
220
// channel was closed
@@ -255,6 +251,15 @@ func (f *FetcherDefault) watch(ctx context.Context, watcher *fsnotify.Watcher, e
255
251
}
256
252
}
257
253
254
+ func (f * FetcherDefault ) enqueueEvent (events chan event , evt event ) {
255
+ f .wg .Add (1 )
256
+ go func () {
257
+ defer f .wg .Done ()
258
+
259
+ events <- evt
260
+ }()
261
+ }
262
+
258
263
func (f * FetcherDefault ) fetch (source url.URL ) ([]Rule , error ) {
259
264
switch source .Scheme {
260
265
case "http" :
0 commit comments