Skip to content

Commit 383fba1

Browse files
committed
nsqlookupd: fix write lock starvation
1 parent 53ee4e4 commit 383fba1

File tree

2 files changed

+50
-70
lines changed

2 files changed

+50
-70
lines changed

nsqlookupd/http.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,9 @@ func (s *httpServer) doDebug(w http.ResponseWriter, req *http.Request, ps httpro
367367
defer s.ctx.nsqlookupd.DB.RUnlock()
368368

369369
data := make(map[string][]map[string]interface{})
370-
for r, producers := range s.ctx.nsqlookupd.DB.registrationMap {
370+
s.ctx.nsqlookupd.DB.registrationMap.Range(func(k, v interface{}) bool {
371+
producers := v.(ProducerMap)
372+
r := k.(Registration)
371373
key := r.Category + ":" + r.Key + ":" + r.SubKey
372374
for _, p := range producers {
373375
m := map[string]interface{}{
@@ -383,7 +385,8 @@ func (s *httpServer) doDebug(w http.ResponseWriter, req *http.Request, ps httpro
383385
}
384386
data[key] = append(data[key], m)
385387
}
386-
}
388+
return true
389+
})
387390

388391
return data, nil
389392
}

nsqlookupd/registration_db.go

Lines changed: 45 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,11 @@ import (
55
"sync"
66
"sync/atomic"
77
"time"
8-
9-
"github.com/patrickmn/go-cache"
108
)
119

1210
type RegistrationDB struct {
1311
sync.RWMutex
14-
registrationMap map[Registration]ProducerMap
15-
16-
cachedMutex sync.RWMutex
17-
cachedFindProducersResults *cache.Cache
12+
registrationMap *sync.Map
1813
}
1914

2015
type MetaDB struct {
@@ -64,8 +59,7 @@ func (p *Producer) IsTombstoned(lifetime time.Duration) bool {
6459

6560
func NewRegistrationDB() *RegistrationDB {
6661
return &RegistrationDB{
67-
registrationMap: make(map[Registration]ProducerMap),
68-
cachedFindProducersResults: cache.New(1*time.Minute, 5*time.Minute),
62+
registrationMap: &sync.Map{},
6963
}
7064
}
7165

@@ -77,12 +71,7 @@ func NewMetaDB() *MetaDB {
7771

7872
// add a registration key
7973
func (r *RegistrationDB) AddRegistration(k Registration) {
80-
r.Lock()
81-
defer r.Unlock()
82-
_, ok := r.registrationMap[k]
83-
if !ok {
84-
r.registrationMap[k] = make(map[string]*Producer)
85-
}
74+
r.registrationMap.LoadOrStore(k, make(ProducerMap))
8675
}
8776

8877
// add a registration key
@@ -123,123 +112,111 @@ func (m *MetaDB) FindRegistrations(category string, key string, subkey string) R
123112
func (r *RegistrationDB) AddProducer(k Registration, p *Producer) bool {
124113
r.Lock()
125114
defer r.Unlock()
126-
_, ok := r.registrationMap[k]
127-
if !ok {
128-
r.registrationMap[k] = make(map[string]*Producer)
129-
}
130-
producers := r.registrationMap[k]
115+
val, _ := r.registrationMap.LoadOrStore(k, make(ProducerMap))
116+
producers := val.(ProducerMap)
131117
_, found := producers[p.peerInfo.id]
132118
if found == false {
133119
producers[p.peerInfo.id] = p
134120
}
121+
135122
return !found
136123
}
137124

138125
// remove a producer from a registration
139126
func (r *RegistrationDB) RemoveProducer(k Registration, id string) (bool, int) {
140127
r.Lock()
141128
defer r.Unlock()
142-
producers, ok := r.registrationMap[k]
129+
value, ok := r.registrationMap.Load(k)
143130
if !ok {
144131
return false, 0
145132
}
133+
producers := value.(ProducerMap)
146134
removed := false
147135
if _, exists := producers[id]; exists {
148136
removed = true
149137
}
150138

151139
// Note: this leaves keys in the DB even if they have empty lists
152140
delete(producers, id)
141+
153142
return removed, len(producers)
154143
}
155144

156145
// remove a Registration and all it's producers
157146
func (r *RegistrationDB) RemoveRegistration(k Registration) {
158-
r.Lock()
159-
defer r.Unlock()
160-
delete(r.registrationMap, k)
147+
r.registrationMap.Delete(k)
161148
}
162149

163150
func (r *RegistrationDB) needFilter(key string, subkey string) bool {
164151
return key == "*" || subkey == "*"
165152
}
166153

167154
func (r *RegistrationDB) FindRegistrations(category string, key string, subkey string) Registrations {
168-
r.RLock()
169-
defer r.RUnlock()
170155
if !r.needFilter(key, subkey) {
171156
k := Registration{category, key, subkey}
172-
if _, ok := r.registrationMap[k]; ok {
157+
if _, ok := r.registrationMap.Load(k); ok {
173158
return Registrations{k}
174159
}
175160
return Registrations{}
176161
}
177162
results := Registrations{}
178-
for k := range r.registrationMap {
179-
if !k.IsMatch(category, key, subkey) {
180-
continue
163+
r.registrationMap.Range(func(k, _ interface{}) bool {
164+
if k.(Registration).IsMatch(category, key, subkey) {
165+
results = append(results, k.(Registration))
181166
}
182-
results = append(results, k)
183-
}
167+
return true
168+
})
184169
return results
185170
}
186171

187172
func (r *RegistrationDB) FindProducers(category string, key string, subkey string) Producers {
188-
r.cachedMutex.RLock()
189-
cachedKey := fmt.Sprintf("%s:%s:%s", category, key, subkey)
190-
191-
if val, found := r.cachedFindProducersResults.Get(cachedKey); found {
192-
r.cachedMutex.RUnlock()
193-
return val.(Producers)
194-
}
195-
196-
r.cachedMutex.RUnlock()
197-
198-
r.cachedMutex.Lock()
199-
defer r.cachedMutex.Unlock()
200-
201-
if val, found := r.cachedFindProducersResults.Get(cachedKey); found {
202-
return val.(Producers)
203-
}
204-
205-
r.RLock()
206-
defer r.RUnlock()
207-
208173
if !r.needFilter(key, subkey) {
209174
k := Registration{category, key, subkey}
210-
r.cachedFindProducersResults.Set(cachedKey, ProducerMap2Slice(r.registrationMap[k]), cache.DefaultExpiration)
211-
return ProducerMap2Slice(r.registrationMap[k])
175+
val, _ := r.registrationMap.Load(k)
176+
177+
r.RLock()
178+
defer r.RUnlock()
179+
return ProducerMap2Slice(val.(ProducerMap))
212180
}
213181

182+
r.RLock()
214183
results := make(map[string]struct{})
215184
var retProducers Producers
216-
for k, producers := range r.registrationMap {
217-
if !k.IsMatch(category, key, subkey) {
218-
continue
219-
}
220-
for _, producer := range producers {
221-
_, found := results[producer.peerInfo.id]
222-
if found == false {
223-
results[producer.peerInfo.id] = struct{}{}
224-
retProducers = append(retProducers, producer)
185+
r.registrationMap.Range(func(k, v interface{}) bool {
186+
if k.(Registration).IsMatch(category, key, subkey) {
187+
producers := v.(ProducerMap)
188+
for _, producer := range producers {
189+
_, found := results[producer.peerInfo.id]
190+
if found == false {
191+
results[producer.peerInfo.id] = struct{}{}
192+
retProducers = append(retProducers, producer)
193+
}
225194
}
226195
}
227-
}
228196

229-
r.cachedFindProducersResults.Set(cachedKey, retProducers, cache.DefaultExpiration)
197+
return true
198+
})
199+
200+
r.RUnlock()
230201

231202
return retProducers
232203
}
233204

234205
func (r *RegistrationDB) LookupRegistrations(id string) Registrations {
235206
r.RLock()
236-
defer r.RUnlock()
207+
237208
results := Registrations{}
238-
for k, producers := range r.registrationMap {
209+
r.registrationMap.Range(func(k, v interface{}) bool {
210+
producers := v.(ProducerMap)
239211
if _, exists := producers[id]; exists {
240-
results = append(results, k)
212+
results = append(results, k.(Registration))
241213
}
242-
}
214+
215+
return true
216+
})
217+
218+
r.RUnlock()
219+
243220
return results
244221
}
245222

0 commit comments

Comments
 (0)