Skip to content

Commit d6d5c3c

Browse files
Harshil goelharshil-goel
Harshil goel
authored andcommitted
Add single key call
Currently namespace used by unique generator uses edge predicate to validate. This shouldn't be the case, we should use the namespace provided in context.
1 parent a274431 commit d6d5c3c

File tree

4 files changed

+243
-35
lines changed

4 files changed

+243
-35
lines changed

posting/list.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1393,6 +1393,58 @@ func (l *List) GetLangTags(readTs uint64) ([]string, error) {
13931393
hex.EncodeToString(l.key))
13941394
}
13951395

1396+
func (l *List) StaticValue(readTs uint64) (*pb.PostingList, error) {
1397+
l.RLock()
1398+
defer l.RUnlock()
1399+
1400+
return l.StaticValueWithLockHeld(readTs)
1401+
}
1402+
1403+
func (l *List) StaticValueWithLockHeld(readTs uint64) (*pb.PostingList, error) {
1404+
val, found, err := l.findStaticValue(readTs, math.MaxUint64)
1405+
if err != nil {
1406+
return val, errors.Wrapf(err,
1407+
"cannot retrieve default value from list with key %s", hex.EncodeToString(l.key))
1408+
}
1409+
if !found {
1410+
return val, ErrNoValue
1411+
}
1412+
return val, nil
1413+
}
1414+
1415+
func (l *List) findStaticValue(readTs, uid uint64) (*pb.PostingList, bool, error) {
1416+
l.AssertRLock()
1417+
1418+
mutation, ok := l.mutationMap[readTs]
1419+
if ok {
1420+
return mutation, true, nil
1421+
}
1422+
1423+
if l.maxTs < readTs {
1424+
mutation, ok = l.mutationMap[l.maxTs]
1425+
if ok {
1426+
return mutation, true, nil
1427+
}
1428+
}
1429+
1430+
if len(l.mutationMap) != 0 {
1431+
for ts, mutation_i := range l.mutationMap {
1432+
if ts <= readTs {
1433+
mutation = mutation_i
1434+
} else {
1435+
break
1436+
}
1437+
}
1438+
return mutation, true, nil
1439+
}
1440+
1441+
if len(l.plist.Postings) > 0 {
1442+
return l.plist, true, nil
1443+
}
1444+
1445+
return nil, false, nil
1446+
}
1447+
13961448
// Value returns the default value from the posting list. The default value is
13971449
// defined as the value without a language tag.
13981450
// Value cannot be used to read from cache

posting/list_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,54 @@ func TestAddMutation_mrjn1(t *testing.T) {
450450
require.Equal(t, 0, ol.Length(txn.StartTs, 0))
451451
}
452452

453+
func TestReadSingleValue(t *testing.T) {
454+
defer setMaxListSize(maxListSize)
455+
maxListSize = math.MaxInt32
456+
457+
// We call pl.Iterate and then stop iterating in the first loop when we are reading
458+
// single values. This test confirms that the two functions, getFirst from this file
459+
// and GetSingeValueForKey works without an issue.
460+
461+
key := x.DataKey(x.GalaxyAttr("value"), 1240)
462+
ol, err := getNew(key, ps, math.MaxUint64)
463+
require.NoError(t, err)
464+
N := int(10000)
465+
for i := 2; i <= N; i += 2 {
466+
edge := &pb.DirectedEdge{
467+
Value: []byte("ho hey there" + strconv.Itoa(i)),
468+
}
469+
txn := Txn{StartTs: uint64(i)}
470+
addMutationHelper(t, ol, edge, Set, &txn)
471+
require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1))
472+
kData := ol.getMutation(uint64(i))
473+
writer := NewTxnWriter(pstore)
474+
if err := writer.SetAt(key, kData, BitDeltaPosting, uint64(i)); err != nil {
475+
require.NoError(t, err)
476+
}
477+
writer.Flush()
478+
479+
if i%10 == 0 {
480+
// Do frequent rollups, and store data in old timestamp
481+
kvs, err := ol.Rollup(nil, txn.StartTs-3)
482+
require.NoError(t, err)
483+
require.NoError(t, writePostingListToDisk(kvs))
484+
ol, err = getNew(key, ps, math.MaxUint64)
485+
require.NoError(t, err)
486+
}
487+
488+
j := 2
489+
if j < int(ol.minTs) {
490+
j = int(ol.minTs)
491+
}
492+
for ; j < i+6; j++ {
493+
tx := NewTxn(uint64(j))
494+
k, err := tx.cache.GetSinglePosting(key)
495+
require.NoError(t, err)
496+
checkValue(t, ol, string(k.Postings[0].Value), uint64(j))
497+
}
498+
}
499+
}
500+
453501
func TestRollupMaxTsIsSet(t *testing.T) {
454502
defer setMaxListSize(maxListSize)
455503
maxListSize = math.MaxInt32

posting/lists.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,80 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error)
322322
return lc.SetIfAbsent(skey, pl), nil
323323
}
324324

325+
// GetSinglePosting retrieves the cached version of the first item in the list associated with the
326+
// given key. This is used for retrieving the value of a scalar predicats.
327+
func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
328+
getList := func() *pb.PostingList {
329+
lc.RLock()
330+
331+
pl := &pb.PostingList{}
332+
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
333+
err := pl.Unmarshal(delta)
334+
if err == nil {
335+
lc.RUnlock()
336+
return pl
337+
}
338+
}
339+
340+
l := lc.plists[string(key)]
341+
lc.RUnlock()
342+
343+
if l != nil {
344+
pl, err := l.StaticValue(lc.startTs)
345+
if err != nil {
346+
return pl
347+
}
348+
}
349+
350+
return nil
351+
}
352+
353+
getPostings := func() (*pb.PostingList, error) {
354+
pl := getList()
355+
if pl != nil {
356+
return pl, nil
357+
}
358+
359+
pl = &pb.PostingList{}
360+
txn := pstore.NewTransactionAt(lc.startTs, false)
361+
item, err := txn.Get(key)
362+
if err != nil {
363+
return nil, err
364+
}
365+
366+
err = item.Value(func(val []byte) error {
367+
if err := pl.Unmarshal(val); err != nil {
368+
return err
369+
}
370+
return nil
371+
})
372+
373+
return pl, err
374+
}
375+
376+
pl, err := getPostings()
377+
if err == badger.ErrKeyNotFound {
378+
return nil, nil
379+
}
380+
if err != nil {
381+
return nil, err
382+
}
383+
384+
// Filter and remove STAR_ALL and OP_DELETE Postings
385+
idx := 0
386+
for _, postings := range pl.Postings {
387+
if hasDeleteAll(postings) {
388+
return nil, nil
389+
}
390+
if postings.Op != Del {
391+
pl.Postings[idx] = postings
392+
idx++
393+
}
394+
}
395+
pl.Postings = pl.Postings[:idx]
396+
return pl, nil
397+
}
398+
325399
// Get retrieves the cached version of the list associated with the given key.
326400
func (lc *LocalCache) Get(key []byte) (*List, error) {
327401
return lc.getInternal(key, true)

worker/task.go

Lines changed: 69 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,14 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
420420
outputs := make([]*pb.Result, numGo)
421421
listType := schema.State().IsList(q.Attr)
422422

423+
// These are certain special cases where we can get away with reading only the latest value
424+
// Lang doesn't work because we would be storing various different languages at various
425+
// time. So when we go to read the latest value, we might get a different language.
426+
// Similarly with DoCount and ExpandAll and Facets. List types are also not supported
427+
// because list is stored by time, and we combine all the list items at various timestamps.
428+
hasLang := schema.State().HasLang(q.Attr)
429+
getMultiplePosting := q.DoCount || q.ExpandAll || listType || hasLang || q.FacetParam != nil
430+
423431
calculate := func(start, end int) error {
424432
x.AssertTrue(start%width == 0)
425433
out := &pb.Result{}
@@ -434,49 +442,75 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
434442
key := x.DataKey(q.Attr, q.UidList.Uids[i])
435443

436444
// Get or create the posting list for an entity, attribute combination.
437-
pl, err := qs.cache.Get(key)
438-
if err != nil {
439-
return err
440-
}
441445

442-
// If count is being requested, there is no need to populate value and facets matrix.
443-
if q.DoCount {
444-
count, err := countForValuePostings(args, pl, facetsTree, listType)
445-
if err != nil && err != posting.ErrNoValue {
446+
var vals []types.Val
447+
fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored
448+
449+
if !getMultiplePosting {
450+
pl, err := qs.cache.GetSinglePosting(key)
451+
if err != nil {
452+
return err
453+
}
454+
if pl == nil || len(pl.Postings) == 0 {
455+
out.UidMatrix = append(out.UidMatrix, &pb.List{})
456+
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
457+
out.ValueMatrix = append(out.ValueMatrix,
458+
&pb.ValueList{Values: []*pb.TaskValue{}})
459+
continue
460+
}
461+
vals = make([]types.Val, len(pl.Postings))
462+
for i, p := range pl.Postings {
463+
vals[i] = types.Val{
464+
Tid: types.TypeID(p.ValType),
465+
Value: p.Value,
466+
}
467+
}
468+
} else {
469+
pl, err := qs.cache.Get(key)
470+
if err != nil {
446471
return err
447472
}
448-
out.Counts = append(out.Counts, uint32(count))
449-
// Add an empty UID list to make later processing consistent.
450-
out.UidMatrix = append(out.UidMatrix, &pb.List{})
451-
continue
452-
}
453473

454-
vals, fcs, err := retrieveValuesAndFacets(args, pl, facetsTree, listType)
455-
switch {
456-
case err == posting.ErrNoValue || (err == nil && len(vals) == 0):
457-
// This branch is taken when the value does not exist in the pl or
458-
// the number of values retrieved is zero (there could still be facets).
459-
// We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and
460-
// LangMatrix so that all these data structure have predictable layouts.
461-
out.UidMatrix = append(out.UidMatrix, &pb.List{})
462-
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
463-
out.ValueMatrix = append(out.ValueMatrix,
464-
&pb.ValueList{Values: []*pb.TaskValue{}})
465-
if q.ExpandAll {
466-
// To keep the cardinality same as that of ValueMatrix.
467-
out.LangMatrix = append(out.LangMatrix, &pb.LangList{})
474+
// If count is being requested, there is no need to populate value and facets matrix.
475+
if q.DoCount {
476+
count, err := countForValuePostings(args, pl, facetsTree, listType)
477+
if err != nil && err != posting.ErrNoValue {
478+
return err
479+
}
480+
out.Counts = append(out.Counts, uint32(count))
481+
// Add an empty UID list to make later processing consistent.
482+
out.UidMatrix = append(out.UidMatrix, &pb.List{})
483+
continue
468484
}
469-
continue
470-
case err != nil:
471-
return err
472-
}
473485

474-
if q.ExpandAll {
475-
langTags, err := pl.GetLangTags(args.q.ReadTs)
476-
if err != nil {
486+
vals, fcs, err = retrieveValuesAndFacets(args, pl, facetsTree, listType)
487+
488+
switch {
489+
case err == posting.ErrNoValue || (err == nil && len(vals) == 0):
490+
// This branch is taken when the value does not exist in the pl or
491+
// the number of values retrieved is zero (there could still be facets).
492+
// We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and
493+
// LangMatrix so that all these data structure have predictable layouts.
494+
out.UidMatrix = append(out.UidMatrix, &pb.List{})
495+
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
496+
out.ValueMatrix = append(out.ValueMatrix,
497+
&pb.ValueList{Values: []*pb.TaskValue{}})
498+
if q.ExpandAll {
499+
// To keep the cardinality same as that of ValueMatrix.
500+
out.LangMatrix = append(out.LangMatrix, &pb.LangList{})
501+
}
502+
continue
503+
case err != nil:
477504
return err
478505
}
479-
out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags})
506+
507+
if q.ExpandAll {
508+
langTags, err := pl.GetLangTags(args.q.ReadTs)
509+
if err != nil {
510+
return err
511+
}
512+
out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags})
513+
}
480514
}
481515

482516
uidList := new(pb.List)

0 commit comments

Comments
 (0)