Skip to content

Commit 873fddf

Browse files
Harshil goelharshil-goel
Harshil goel
authored andcommitted
add single key call
Currently namespace used by unique generator uses edge predicate to validate. This shouldn't be the case, we should use the namespace provided in context.
1 parent 8e79c04 commit 873fddf

File tree

3 files changed

+188
-35
lines changed

3 files changed

+188
-35
lines changed

posting/list_test.go

+48
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,54 @@ func TestAddMutation_mrjn1(t *testing.T) {
450450
require.Equal(t, 0, ol.Length(txn.StartTs, 0))
451451
}
452452

453+
func TestReadSingleValue(t *testing.T) {
454+
defer setMaxListSize(maxListSize)
455+
maxListSize = math.MaxInt32
456+
457+
// We call pl.Iterate and then stop iterating in the first loop when we are reading
458+
// single values. This test confirms that the two functions, getFirst from this file
459+
// and GetSingeValueForKey works without an issue.
460+
461+
key := x.DataKey(x.GalaxyAttr("value"), 1240)
462+
ol, err := getNew(key, ps, math.MaxUint64)
463+
require.NoError(t, err)
464+
N := int(10000)
465+
for i := 2; i <= N; i += 2 {
466+
edge := &pb.DirectedEdge{
467+
Value: []byte("ho hey there" + strconv.Itoa(i)),
468+
}
469+
txn := Txn{StartTs: uint64(i)}
470+
addMutationHelper(t, ol, edge, Set, &txn)
471+
require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1))
472+
kData := ol.getMutation(uint64(i))
473+
writer := NewTxnWriter(pstore)
474+
if err := writer.SetAt(key, kData, BitDeltaPosting, uint64(i)); err != nil {
475+
require.NoError(t, err)
476+
}
477+
writer.Flush()
478+
479+
if i%10 == 0 {
480+
// Do frequent rollups, and store data in old timestamp
481+
kvs, err := ol.Rollup(nil, txn.StartTs-3)
482+
require.NoError(t, err)
483+
require.NoError(t, writePostingListToDisk(kvs))
484+
ol, err = getNew(key, ps, math.MaxUint64)
485+
require.NoError(t, err)
486+
}
487+
488+
j := 2
489+
if j < int(ol.minTs) {
490+
j = int(ol.minTs)
491+
}
492+
for ; j < i+6; j++ {
493+
tx := NewTxn(uint64(j))
494+
k, err := tx.cache.GetSinglePosting(key)
495+
require.NoError(t, err)
496+
checkValue(t, ol, string(k.Postings[0].Value), uint64(j))
497+
}
498+
}
499+
}
500+
453501
func TestRollupMaxTsIsSet(t *testing.T) {
454502
defer setMaxListSize(maxListSize)
455503
maxListSize = math.MaxInt32

posting/lists.go

+71
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,77 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error)
322322
return lc.SetIfAbsent(skey, pl), nil
323323
}
324324

325+
// GetSinglePosting retrieves the cached version of the first item in the list associated with the
326+
// given key. This is used for retrieving the value of a scalar predicats.
327+
func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
328+
getList := func() *pb.PostingList {
329+
lc.RLock()
330+
defer lc.RUnlock()
331+
332+
pl := &pb.PostingList{}
333+
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
334+
err := pl.Unmarshal(delta)
335+
if err == nil {
336+
return pl
337+
}
338+
}
339+
340+
l := lc.plists[string(key)]
341+
if l != nil {
342+
// If the current transaction is updating it, read it from here.
343+
// Otherwise read it from disk. TODO see if this can be fixed.
344+
return l.mutationMap[lc.startTs]
345+
}
346+
347+
return nil
348+
}
349+
350+
getPostings := func() (*pb.PostingList, error) {
351+
pl := getList()
352+
if pl != nil {
353+
return pl, nil
354+
}
355+
356+
pl = &pb.PostingList{}
357+
txn := pstore.NewTransactionAt(lc.startTs, false)
358+
item, err := txn.Get(key)
359+
if err != nil {
360+
return nil, err
361+
}
362+
363+
err = item.Value(func(val []byte) error {
364+
if err := pl.Unmarshal(val); err != nil {
365+
return err
366+
}
367+
return nil
368+
})
369+
370+
return pl, err
371+
}
372+
373+
pl, err := getPostings()
374+
if err == badger.ErrKeyNotFound {
375+
return nil, nil
376+
}
377+
if err != nil {
378+
return nil, err
379+
}
380+
381+
// Filter and remove STAR_ALL and OP_DELETE Postings
382+
idx := 0
383+
for _, postings := range pl.Postings {
384+
if hasDeleteAll(postings) {
385+
return nil, nil
386+
}
387+
if postings.Op != Del {
388+
pl.Postings[idx] = postings
389+
idx++
390+
}
391+
}
392+
pl.Postings = pl.Postings[:idx]
393+
return pl, nil
394+
}
395+
325396
// Get retrieves the cached version of the list associated with the given key.
326397
func (lc *LocalCache) Get(key []byte) (*List, error) {
327398
return lc.getInternal(key, true)

worker/task.go

+69-35
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,14 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
420420
outputs := make([]*pb.Result, numGo)
421421
listType := schema.State().IsList(q.Attr)
422422

423+
// These are certain special cases where we can get away with reading only the latest value
424+
// Lang doesn't work because we would be storing various different languages at various
425+
// time. So when we go to read the latest value, we might get a different language.
426+
// Similarly with DoCount and ExpandAll and Facets. List types are also not supported
427+
// because list is stored by time, and we combine all the list items at various timestamps.
428+
hasLang := schema.State().HasLang(q.Attr)
429+
getMultiplePosting := q.DoCount || q.ExpandAll || listType || hasLang || q.FacetParam != nil
430+
423431
calculate := func(start, end int) error {
424432
x.AssertTrue(start%width == 0)
425433
out := &pb.Result{}
@@ -434,49 +442,75 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
434442
key := x.DataKey(q.Attr, q.UidList.Uids[i])
435443

436444
// Get or create the posting list for an entity, attribute combination.
437-
pl, err := qs.cache.Get(key)
438-
if err != nil {
439-
return err
440-
}
441445

442-
// If count is being requested, there is no need to populate value and facets matrix.
443-
if q.DoCount {
444-
count, err := countForValuePostings(args, pl, facetsTree, listType)
445-
if err != nil && err != posting.ErrNoValue {
446+
var vals []types.Val
447+
fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored
448+
449+
if !getMultiplePosting {
450+
pl, err := qs.cache.GetSinglePosting(key)
451+
if err != nil {
452+
return err
453+
}
454+
if pl == nil || len(pl.Postings) == 0 {
455+
out.UidMatrix = append(out.UidMatrix, &pb.List{})
456+
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
457+
out.ValueMatrix = append(out.ValueMatrix,
458+
&pb.ValueList{Values: []*pb.TaskValue{}})
459+
continue
460+
}
461+
vals = make([]types.Val, len(pl.Postings))
462+
for i, p := range pl.Postings {
463+
vals[i] = types.Val{
464+
Tid: types.TypeID(p.ValType),
465+
Value: p.Value,
466+
}
467+
}
468+
} else {
469+
pl, err := qs.cache.Get(key)
470+
if err != nil {
446471
return err
447472
}
448-
out.Counts = append(out.Counts, uint32(count))
449-
// Add an empty UID list to make later processing consistent.
450-
out.UidMatrix = append(out.UidMatrix, &pb.List{})
451-
continue
452-
}
453473

454-
vals, fcs, err := retrieveValuesAndFacets(args, pl, facetsTree, listType)
455-
switch {
456-
case err == posting.ErrNoValue || (err == nil && len(vals) == 0):
457-
// This branch is taken when the value does not exist in the pl or
458-
// the number of values retrieved is zero (there could still be facets).
459-
// We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and
460-
// LangMatrix so that all these data structure have predictable layouts.
461-
out.UidMatrix = append(out.UidMatrix, &pb.List{})
462-
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
463-
out.ValueMatrix = append(out.ValueMatrix,
464-
&pb.ValueList{Values: []*pb.TaskValue{}})
465-
if q.ExpandAll {
466-
// To keep the cardinality same as that of ValueMatrix.
467-
out.LangMatrix = append(out.LangMatrix, &pb.LangList{})
474+
// If count is being requested, there is no need to populate value and facets matrix.
475+
if q.DoCount {
476+
count, err := countForValuePostings(args, pl, facetsTree, listType)
477+
if err != nil && err != posting.ErrNoValue {
478+
return err
479+
}
480+
out.Counts = append(out.Counts, uint32(count))
481+
// Add an empty UID list to make later processing consistent.
482+
out.UidMatrix = append(out.UidMatrix, &pb.List{})
483+
continue
468484
}
469-
continue
470-
case err != nil:
471-
return err
472-
}
473485

474-
if q.ExpandAll {
475-
langTags, err := pl.GetLangTags(args.q.ReadTs)
476-
if err != nil {
486+
vals, fcs, err = retrieveValuesAndFacets(args, pl, facetsTree, listType)
487+
488+
switch {
489+
case err == posting.ErrNoValue || (err == nil && len(vals) == 0):
490+
// This branch is taken when the value does not exist in the pl or
491+
// the number of values retrieved is zero (there could still be facets).
492+
// We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and
493+
// LangMatrix so that all these data structure have predictable layouts.
494+
out.UidMatrix = append(out.UidMatrix, &pb.List{})
495+
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
496+
out.ValueMatrix = append(out.ValueMatrix,
497+
&pb.ValueList{Values: []*pb.TaskValue{}})
498+
if q.ExpandAll {
499+
// To keep the cardinality same as that of ValueMatrix.
500+
out.LangMatrix = append(out.LangMatrix, &pb.LangList{})
501+
}
502+
continue
503+
case err != nil:
477504
return err
478505
}
479-
out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags})
506+
507+
if q.ExpandAll {
508+
langTags, err := pl.GetLangTags(args.q.ReadTs)
509+
if err != nil {
510+
return err
511+
}
512+
out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags})
513+
}
480514
}
481515

482516
uidList := new(pb.List)

0 commit comments

Comments
 (0)