Skip to content

Commit 7ba5300

Browse files
author
Harshil Goel
committed
Tried to add posting list cache for single values
1 parent c1bd839 commit 7ba5300

File tree

2 files changed

+60
-4
lines changed

2 files changed

+60
-4
lines changed

posting/lists.go

+31-3
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ type LocalCache struct {
111111

112112
// plists are posting lists in memory. They can be discarded to reclaim space.
113113
plists map[string]*List
114+
115+
postings map[string]*pb.PostingList
114116
}
115117

116118
// NewLocalCache returns a new LocalCache instance.
@@ -120,13 +122,17 @@ func NewLocalCache(startTs uint64) *LocalCache {
120122
deltas: make(map[string][]byte),
121123
plists: make(map[string]*List),
122124
maxVersions: make(map[string]uint64),
125+
postings: make(map[string]*pb.PostingList),
123126
}
124127
}
125128

126129
// NoCache returns a new LocalCache instance, which won't cache anything. Useful to pass startTs
127130
// around.
128131
func NoCache(startTs uint64) *LocalCache {
129-
return &LocalCache{startTs: startTs}
132+
return &LocalCache{
133+
startTs: startTs,
134+
postings: make(map[string]*pb.PostingList),
135+
}
130136
}
131137

132138
func (lc *LocalCache) getNoStore(key string) *List {
@@ -138,6 +144,20 @@ func (lc *LocalCache) getNoStore(key string) *List {
138144
return nil
139145
}
140146

147+
// SetIfAbsent adds the list for the specified key to the cache. If a list for the same
148+
// key already exists, the cache will not be modified and the existing list
149+
// will be returned instead. This behavior is meant to prevent the goroutines
150+
// using the cache from ending up with an orphaned version of a list.
151+
func (lc *LocalCache) SetPostingIfAbsent(key string, updated *pb.PostingList) *pb.PostingList {
152+
lc.Lock()
153+
defer lc.Unlock()
154+
if pl, ok := lc.postings[key]; ok {
155+
return pl
156+
}
157+
lc.postings[key] = updated
158+
return updated
159+
}
160+
141161
// SetIfAbsent adds the list for the specified key to the cache. If a list for the same
142162
// key already exists, the cache will not be modified and the existing list
143163
// will be returned instead. This behavior is meant to prevent the goroutines
@@ -200,8 +220,10 @@ func (lc *LocalCache) GetBatchSinglePosting(keys [][]byte) ([]*pb.PostingList, e
200220
remaining_keys := make([][]byte, 0)
201221
lc.RLock()
202222
for i, key := range keys {
203-
pl := &pb.PostingList{}
204-
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
223+
if pl, ok := lc.postings[string(key)]; ok && pl != nil {
224+
results[i] = pl
225+
} else if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
226+
pl := &pb.PostingList{}
205227
err := pl.Unmarshal(delta)
206228
if err != nil {
207229
results[i] = pl
@@ -214,6 +236,10 @@ func (lc *LocalCache) GetBatchSinglePosting(keys [][]byte) ([]*pb.PostingList, e
214236

215237
txn := pstore.NewTransactionAt(lc.startTs, false)
216238
items, err := txn.GetBatch(remaining_keys)
239+
if err != nil {
240+
fmt.Println(err, keys)
241+
return nil, err
242+
}
217243
idx := 0
218244

219245
for i := 0; i < len(results); i++ {
@@ -245,6 +271,7 @@ func (lc *LocalCache) GetBatchSinglePosting(keys [][]byte) ([]*pb.PostingList, e
245271
}
246272
pl.Postings = pl.Postings[:idx]
247273
results[i] = pl
274+
lc.SetPostingIfAbsent(string(keys[i]), pl)
248275
}
249276

250277
return results, err
@@ -311,6 +338,7 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
311338
}
312339
}
313340
pl.Postings = pl.Postings[:idx]
341+
lc.SetPostingIfAbsent(string(key), pl)
314342
return pl, nil
315343
}
316344

worker/sort.go

+29-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"encoding/hex"
2222
"sort"
2323
"strings"
24+
"time"
2425

2526
"github.com/golang/glog"
2627
"github.com/pkg/errors"
@@ -511,7 +512,34 @@ func processSort(ctx context.Context, ts *pb.SortMessage) (*pb.SortResult, error
511512
cctx, cancel := context.WithCancel(ctx)
512513
defer cancel()
513514

514-
r := sortWithIndex(cctx, ts)
515+
resCh := make(chan *sortresult, 2)
516+
go func() {
517+
select {
518+
case <-time.After(3 * time.Millisecond):
519+
// Wait between ctx chan and time chan.
520+
case <-ctx.Done():
521+
resCh <- &sortresult{err: ctx.Err()}
522+
return
523+
}
524+
r := sortWithoutIndex(cctx, ts)
525+
resCh <- r
526+
}()
527+
528+
go func() {
529+
sr := sortWithIndex(cctx, ts)
530+
resCh <- sr
531+
}()
532+
533+
r := <-resCh
534+
if r.err == nil {
535+
cancel()
536+
// wait for other goroutine to get cancelled
537+
<-resCh
538+
} else {
539+
span.Annotatef(nil, "processSort error: %v", r.err)
540+
r = <-resCh
541+
}
542+
515543
if r.err != nil {
516544
return nil, r.err
517545
}

0 commit comments

Comments
 (0)