Skip to content

Commit 191f210

Browse files
author
Harshil Goel
committed
minor refactor
1 parent 7fb3a81 commit 191f210

File tree

3 files changed

+40
-169
lines changed

3 files changed

+40
-169
lines changed

posting/list_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -475,10 +475,10 @@ func TestReadSingleValue(t *testing.T) {
475475
j = int(ol.minTs)
476476
}
477477
for ; j < i+6; j++ {
478-
k, err := GetSingleValueForKey(key, uint64(j))
478+
tx := NewTxn(uint64(j))
479+
k, err := tx.cache.GetSinglePosting(key)
479480
require.NoError(t, err)
480-
p := getFirst(t, k, uint64(j))
481-
checkValue(t, ol, string(p.Value), uint64(j))
481+
checkValue(t, ol, string(k.Postings[0].Value), uint64(j))
482482
}
483483

484484
}

posting/lists.go

+37-91
Original file line numberDiff line numberDiff line change
@@ -152,64 +152,12 @@ func (lc *LocalCache) SetIfAbsent(key string, updated *List) *List {
152152
return updated
153153
}
154154

155-
func (lc *LocalCache) getSingleInternal(key []byte, readFromDisk bool) (*List, error) {
156-
getNewPlistNil := func() (*List, error) {
157-
lc.RLock()
158-
defer lc.RUnlock()
159-
if lc.plists == nil {
160-
pl, err := GetSingleValueForKey(key, lc.startTs)
161-
return pl, err
162-
}
163-
return nil, nil
164-
}
165-
166-
if l, err := getNewPlistNil(); l != nil || err != nil {
167-
return l, err
168-
}
169-
170-
skey := string(key)
171-
if pl := lc.getNoStore(skey); pl != nil {
172-
return pl, nil
173-
}
174-
175-
var pl *List
176-
var err error
177-
if readFromDisk {
178-
pl, err = GetSingleValueForKey(key, lc.startTs)
179-
} else {
180-
pl = &List{
181-
key: key,
182-
plist: new(pb.PostingList),
183-
}
184-
}
185-
186-
// If we just brought this posting list into memory and we already have a delta for it, let's
187-
// apply it before returning the list.
188-
lc.RLock()
189-
if delta, ok := lc.deltas[skey]; ok && len(delta) > 0 {
190-
pl.setMutation(lc.startTs, delta)
191-
}
192-
lc.RUnlock()
193-
return pl, err
194-
}
195-
196155
func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) {
197156
getNewPlistNil := func() (*List, error) {
198157
lc.RLock()
199158
defer lc.RUnlock()
200159
if lc.plists == nil {
201-
if readFromDisk {
202-
return getNew(key, pstore, lc.startTs)
203-
} else {
204-
pl := &List{
205-
key: key,
206-
plist: new(pb.PostingList),
207-
}
208-
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
209-
pl.setMutation(lc.startTs, delta)
210-
}
211-
return pl, nil
212-
}
160+
return getNew(key, pstore, lc.startTs)
213161
}
214162
return nil, nil
215163
}
@@ -248,60 +196,58 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error)
248196
}
249197

250198
func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
251-
pl := &pb.PostingList{}
252-
validatePl := func() {
253-
i := 0
254-
for _, postings := range pl.Postings {
255-
if hasDeleteAll(postings) {
256-
pl = nil
257-
return
258-
}
259-
if postings.Op != Del {
260-
pl.Postings[i] = postings
261-
i++
199+
200+
getPostings := func() (*pb.PostingList, error) {
201+
pl := &pb.PostingList{}
202+
lc.RLock()
203+
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
204+
err := pl.Unmarshal(delta)
205+
if err != nil {
206+
lc.RUnlock()
207+
return pl, nil
262208
}
263209
}
264-
pl.Postings = pl.Postings[:i]
265-
}
266-
lc.RLock()
267-
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
268-
err := pl.Unmarshal(delta)
269210
lc.RUnlock()
211+
212+
txn := pstore.NewTransactionAt(lc.startTs, false)
213+
item, err := txn.Get(key)
270214
if err != nil {
271-
validatePl()
272-
return pl, nil
215+
return pl, err
273216
}
274-
} else {
275-
lc.RUnlock()
276-
}
277217

278-
txn := pstore.NewTransactionAt(lc.startTs, false)
279-
item, err := txn.Get(key)
280-
if err != nil {
281-
validatePl()
218+
err = item.Value(func(val []byte) error {
219+
if err := pl.Unmarshal(val); err != nil {
220+
return err
221+
}
222+
return nil
223+
})
224+
282225
return pl, err
283226
}
284227

285-
err = item.Value(func(val []byte) error {
286-
if err := pl.Unmarshal(val); err != nil {
287-
return err
288-
}
289-
return nil
290-
})
291-
228+
pl, err := getPostings()
229+
if err == badger.ErrKeyNotFound {
230+
err = nil
231+
}
292232
if err != nil {
293-
validatePl()
294233
return pl, err
295234
}
296235

297-
validatePl()
236+
// Filter and remove STAR_ALL and OP_DELETE Postings
237+
idx := 0
238+
for _, postings := range pl.Postings {
239+
if hasDeleteAll(postings) {
240+
return nil, nil
241+
}
242+
if postings.Op != Del {
243+
pl.Postings[idx] = postings
244+
idx++
245+
}
246+
}
247+
pl.Postings = pl.Postings[:idx]
298248
return pl, nil
299249
}
300250

301-
func (lc *LocalCache) GetSingle(key []byte) (*List, error) {
302-
return lc.getSingleInternal(key, true)
303-
}
304-
305251
// Get retrieves the cached version of the list associated with the given key.
306252
func (lc *LocalCache) Get(key []byte) (*List, error) {
307253
return lc.getInternal(key, true)

posting/mvcc.go

-75
Original file line numberDiff line numberDiff line change
@@ -457,81 +457,6 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
457457
return l, nil
458458
}
459459

460-
func GetSingleValueForKey(key []byte, readTs uint64) (*List, error) {
461-
cachedVal, ok := lCache.Get(key)
462-
if ok {
463-
l, ok := cachedVal.(*List)
464-
if ok && l != nil {
465-
// No need to clone the immutable layer or the key since mutations will not modify it.
466-
lCopy := &List{
467-
minTs: l.minTs,
468-
maxTs: l.maxTs,
469-
key: key,
470-
plist: l.plist,
471-
}
472-
l.RLock()
473-
if l.mutationMap != nil {
474-
lCopy.mutationMap = make(map[uint64]*pb.PostingList, len(l.mutationMap))
475-
for ts, pl := range l.mutationMap {
476-
lCopy.mutationMap[ts] = proto.Clone(pl).(*pb.PostingList)
477-
}
478-
}
479-
l.RUnlock()
480-
return lCopy, nil
481-
}
482-
}
483-
484-
if pstore.IsClosed() {
485-
return nil, badger.ErrDBClosed
486-
}
487-
488-
l := new(List)
489-
l.key = key
490-
l.plist = new(pb.PostingList)
491-
492-
txn := pstore.NewTransactionAt(readTs, false)
493-
item, err := txn.Get(key)
494-
if err != nil {
495-
return l, err
496-
}
497-
498-
l.maxTs = x.Max(l.maxTs, item.Version())
499-
500-
switch item.UserMeta() {
501-
case BitEmptyPosting:
502-
l.minTs = item.Version()
503-
case BitCompletePosting:
504-
if err := unmarshalOrCopy(l.plist, item); err != nil {
505-
return l, nil
506-
}
507-
l.minTs = item.Version()
508-
509-
case BitDeltaPosting:
510-
err := item.Value(func(val []byte) error {
511-
pl := &pb.PostingList{}
512-
if err := pl.Unmarshal(val); err != nil {
513-
return err
514-
}
515-
pl.CommitTs = item.Version()
516-
for _, mpost := range pl.Postings {
517-
// commitTs, startTs are meant to be only in memory, not
518-
// stored on disk.
519-
mpost.CommitTs = item.Version()
520-
}
521-
if l.mutationMap == nil {
522-
l.mutationMap = make(map[uint64]*pb.PostingList)
523-
}
524-
l.mutationMap[pl.CommitTs] = pl
525-
return nil
526-
})
527-
if err != nil {
528-
return l, nil
529-
}
530-
}
531-
532-
return l, nil
533-
}
534-
535460
func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
536461
cachedVal, ok := lCache.Get(key)
537462
if ok {

0 commit comments

Comments
 (0)