Skip to content

Commit 6fbd525

Browse files
Harshil Goelharshil-goel
Harshil Goel
authored andcommitted
Use get batch api badger
1 parent 8d744e6 commit 6fbd525

File tree

5 files changed

+204
-40
lines changed

5 files changed

+204
-40
lines changed

go.mod

+5-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ module github.com/dgraph-io/dgraph
22

33
go 1.19
44

5+
replace github.com/dgraph-io/badger/v4 => ../badger
6+
57
require (
68
contrib.go.opencensus.io/exporter/jaeger v0.1.0
79
contrib.go.opencensus.io/exporter/prometheus v0.1.0
@@ -40,7 +42,7 @@ require (
4042
github.com/mitchellh/panicwrap v1.0.0
4143
github.com/paulmach/go.geojson v0.0.0-20170327170536-40612a87147b
4244
github.com/pkg/errors v0.9.1
43-
github.com/pkg/profile v1.2.1
45+
github.com/pkg/profile v1.7.0
4446
github.com/prometheus/client_golang v1.14.0
4547
github.com/soheilhy/cmux v0.1.4
4648
github.com/spf13/cast v1.3.0
@@ -85,10 +87,12 @@ require (
8587
github.com/eapache/go-resiliency v1.4.0 // indirect
8688
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
8789
github.com/eapache/queue v1.1.0 // indirect
90+
github.com/felixge/fgprof v0.9.3 // indirect
8891
github.com/frankban/quicktest v1.10.2 // indirect
8992
github.com/fsnotify/fsnotify v1.6.0 // indirect
9093
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
9194
github.com/google/flatbuffers v1.12.1 // indirect
95+
github.com/google/pprof v0.0.0-20211214055906-6f57359322fd // indirect
9296
github.com/hashicorp/errwrap v1.1.0 // indirect
9397
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
9498
github.com/hashicorp/go-multierror v1.1.1 // indirect

go.sum

+8-4
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,6 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
145145
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
146146
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
147147
github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
148-
github.com/dgraph-io/badger/v4 v4.2.0 h1:kJrlajbXXL9DFTNuhhu9yCx7JJa4qpYWxtE8BzuWsEs=
149-
github.com/dgraph-io/badger/v4 v4.2.0/go.mod h1:qfCqhPoWDFJRx1gp5QwwyGo8xk1lbHUxvK9nK0OGAak=
150148
github.com/dgraph-io/dgo/v230 v230.0.1 h1:kR7gI7/ZZv0jtG6dnedNgNOCxe1cbSG8ekF+pNfReks=
151149
github.com/dgraph-io/dgo/v230 v230.0.1/go.mod h1:5FerO2h4LPOxR2XTkOAtqUUPaFdQ+5aBOHXPBJ3nT10=
152150
github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM=
@@ -199,6 +197,8 @@ github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHj
199197
github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8=
200198
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
201199
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
200+
github.com/felixge/fgprof v0.9.3 h1:VvyZxILNuCiUCSXtPtYmmtGvb65nqXh2QFWc0Wpf2/g=
201+
github.com/felixge/fgprof v0.9.3/go.mod h1:RdbpDgzqYVh/T9fPELJyV7EYJuHB55UTEULNun8eiPw=
202202
github.com/flosch/pongo2 v0.0.0-20190707114632-bbf5a6c351f4/go.mod h1:T9YF2M40nIgbVgp3rreNmTged+9HrbNTIQf1PsaIiTA=
203203
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
204204
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
@@ -316,6 +316,8 @@ github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hf
316316
github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
317317
github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
318318
github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
319+
github.com/google/pprof v0.0.0-20211214055906-6f57359322fd h1:1FjCyPC+syAzJ5/2S8fqdZK1R22vvA0J7JZKcuOIQ7Y=
320+
github.com/google/pprof v0.0.0-20211214055906-6f57359322fd/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg=
319321
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
320322
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
321323
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
@@ -394,6 +396,7 @@ github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKe
394396
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
395397
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
396398
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
399+
github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
397400
github.com/imkira/go-interpol v1.1.0/go.mod h1:z0h2/2T3XF8kyEPpRgJ3kmNv+C43p+I/CoI+jC3w2iA=
398401
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
399402
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
@@ -557,8 +560,8 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
557560
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
558561
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
559562
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
560-
github.com/pkg/profile v1.2.1 h1:F++O52m40owAmADcojzM+9gyjmMOY/T4oYJkgFDH8RE=
561-
github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
563+
github.com/pkg/profile v1.7.0 h1:hnbDkaNWPCLMO9wGLdBFTIZvzDrDfBM2072E1S9gJkA=
564+
github.com/pkg/profile v1.7.0/go.mod h1:8Uer0jas47ZQMJ7VD+OHknK4YDY07LPUC6dEvqDjvNo=
562565
github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg=
563566
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
564567
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
@@ -899,6 +902,7 @@ golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073/go.mod h1:h1NjWce9XRLGQEsW7w
899902
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
900903
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
901904
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
905+
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
902906
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
903907
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
904908
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

posting/list_test.go

+48
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,54 @@ func TestAddMutation_mrjn1(t *testing.T) {
435435
require.Equal(t, 0, ol.Length(txn.StartTs, 0))
436436
}
437437

438+
func TestReadSingleValue(t *testing.T) {
439+
defer setMaxListSize(maxListSize)
440+
maxListSize = math.MaxInt32
441+
442+
// We call pl.Iterate and then stop iterating in the first loop when we are reading
443+
// single values. This test confirms that the two functions, getFirst from this file
444+
// and GetSingeValueForKey works without an issue.
445+
446+
key := x.DataKey(x.GalaxyAttr("value"), 1240)
447+
ol, err := getNew(key, ps, math.MaxUint64)
448+
require.NoError(t, err)
449+
N := int(10000)
450+
for i := 2; i <= N; i += 2 {
451+
edge := &pb.DirectedEdge{
452+
Value: []byte("ho hey there" + strconv.Itoa(i)),
453+
}
454+
txn := Txn{StartTs: uint64(i)}
455+
addMutationHelper(t, ol, edge, Set, &txn)
456+
require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1))
457+
kData := ol.getMutation(uint64(i))
458+
writer := NewTxnWriter(pstore)
459+
if err := writer.SetAt(key, kData, BitDeltaPosting, uint64(i)); err != nil {
460+
require.NoError(t, err)
461+
}
462+
writer.Flush()
463+
464+
if i%10 == 0 {
465+
// Do frequent rollups, and store data in old timestamp
466+
kvs, err := ol.Rollup(nil, txn.StartTs-3)
467+
require.NoError(t, err)
468+
require.NoError(t, writePostingListToDisk(kvs))
469+
ol, err = getNew(key, ps, math.MaxUint64)
470+
require.NoError(t, err)
471+
}
472+
473+
j := 2
474+
if j < int(ol.minTs) {
475+
j = int(ol.minTs)
476+
}
477+
for ; j < i+6; j++ {
478+
tx := NewTxn(uint64(j))
479+
k, err := tx.cache.GetSinglePosting(key)
480+
require.NoError(t, err)
481+
checkValue(t, ol, string(k.Postings[0].Value), uint64(j))
482+
}
483+
}
484+
}
485+
438486
func TestRollupMaxTsIsSet(t *testing.T) {
439487
defer setMaxListSize(maxListSize)
440488
maxListSize = math.MaxInt32

posting/lists.go

+62
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,68 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error)
195195
return lc.SetIfAbsent(skey, pl), nil
196196
}
197197

198+
func (lc *LocalCache) GetBatchSinglePosting(keys [][]byte) ([]*pb.PostingList, error) {
199+
results := make([]*pb.PostingList, len(keys))
200+
remaining_keys := make([][]byte, 0)
201+
lc.RLock()
202+
for i, key := range keys {
203+
if pl, ok := lc.postings[string(key)]; ok && pl != nil {
204+
results[i] = pl
205+
} else if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
206+
pl := &pb.PostingList{}
207+
err := pl.Unmarshal(delta)
208+
if err != nil {
209+
results[i] = pl
210+
}
211+
} else {
212+
remaining_keys = append(remaining_keys, key)
213+
}
214+
}
215+
lc.RUnlock()
216+
217+
txn := pstore.NewTransactionAt(lc.startTs, false)
218+
items, err := txn.GetBatch(remaining_keys)
219+
if err != nil {
220+
fmt.Println(err, keys)
221+
return nil, err
222+
}
223+
idx := 0
224+
225+
for i := 0; i < len(results); i++ {
226+
if results[i] != nil {
227+
continue
228+
}
229+
pl := &pb.PostingList{}
230+
err = items[idx].Value(func(val []byte) error {
231+
if err := pl.Unmarshal(val); err != nil {
232+
return err
233+
}
234+
return nil
235+
})
236+
idx += 1
237+
results[i] = pl
238+
}
239+
240+
for i := 0; i < len(results); i++ {
241+
pl := results[i]
242+
idx := 0
243+
for _, postings := range pl.Postings {
244+
if hasDeleteAll(postings) {
245+
return nil, nil
246+
}
247+
if postings.Op != Del {
248+
pl.Postings[idx] = postings
249+
idx++
250+
}
251+
}
252+
pl.Postings = pl.Postings[:idx]
253+
results[i] = pl
254+
lc.SetPostingIfAbsent(string(keys[i]), pl)
255+
}
256+
257+
return results, err
258+
}
259+
198260
// Get retrieves the cached version of the list associated with the given key.
199261
func (lc *LocalCache) Get(key []byte) (*List, error) {
200262
return lc.getInternal(key, true)

worker/task.go

+81-35
Original file line numberDiff line numberDiff line change
@@ -377,11 +377,20 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
377377
outputs := make([]*pb.Result, numGo)
378378
listType := schema.State().IsList(q.Attr)
379379

380+
// These are certain special cases where we can get away with reading only the latest value
381+
// Lang doesn't work because we would be storing various different languages at various
382+
// time. So when we go to read the latest value, we might get a different language.
383+
// Similarly with DoCount and ExpandAll and Facets. List types are also not supported
384+
// because list is stored by time, and we combine all the list items at various timestamps.
385+
hasLang := schema.State().HasLang(q.Attr)
386+
getMultiplePosting := q.DoCount || q.ExpandAll || listType || hasLang || q.FacetParam != nil
387+
380388
calculate := func(start, end int) error {
381389
x.AssertTrue(start%width == 0)
382390
out := &pb.Result{}
383391
outputs[start/width] = out
384392

393+
cache := make([]*pb.PostingList, 0)
385394
for i := start; i < end; i++ {
386395
select {
387396
case <-ctx.Done():
@@ -391,49 +400,86 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
391400
key := x.DataKey(q.Attr, q.UidList.Uids[i])
392401

393402
// Get or create the posting list for an entity, attribute combination.
394-
pl, err := qs.cache.Get(key)
395-
if err != nil {
396-
return err
397-
}
398403

399-
// If count is being requested, there is no need to populate value and facets matrix.
400-
if q.DoCount {
401-
count, err := countForValuePostings(args, pl, facetsTree, listType)
402-
if err != nil && err != posting.ErrNoValue {
404+
var vals []types.Val
405+
fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored
406+
407+
if !getMultiplePosting {
408+
if len(cache) == 0 {
409+
keys := make([][]byte, 10)
410+
keys[0] = key
411+
for j := i + 1; j < i+10 && j < end; j++ {
412+
keys[j-i] = x.DataKey(q.Attr, q.UidList.Uids[j])
413+
}
414+
cache, err = qs.cache.GetBatchSinglePosting(keys)
415+
if err != nil {
416+
return err
417+
}
418+
}
419+
pl := cache[0]
420+
if len(cache) > 1 {
421+
cache = cache[1:]
422+
}
423+
if pl == nil || len(pl.Postings) == 0 {
424+
out.UidMatrix = append(out.UidMatrix, &pb.List{})
425+
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
426+
out.ValueMatrix = append(out.ValueMatrix,
427+
&pb.ValueList{Values: []*pb.TaskValue{}})
428+
continue
429+
}
430+
vals = make([]types.Val, len(pl.Postings))
431+
for i, p := range pl.Postings {
432+
vals[i] = types.Val{
433+
Tid: types.TypeID(p.ValType),
434+
Value: p.Value,
435+
}
436+
}
437+
} else {
438+
pl, err := qs.cache.Get(key)
439+
if err != nil {
403440
return err
404441
}
405-
out.Counts = append(out.Counts, uint32(count))
406-
// Add an empty UID list to make later processing consistent.
407-
out.UidMatrix = append(out.UidMatrix, &pb.List{})
408-
continue
409-
}
410442

411-
vals, fcs, err := retrieveValuesAndFacets(args, pl, facetsTree, listType)
412-
switch {
413-
case err == posting.ErrNoValue || (err == nil && len(vals) == 0):
414-
// This branch is taken when the value does not exist in the pl or
415-
// the number of values retrieved is zero (there could still be facets).
416-
// We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and
417-
// LangMatrix so that all these data structure have predictable layouts.
418-
out.UidMatrix = append(out.UidMatrix, &pb.List{})
419-
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
420-
out.ValueMatrix = append(out.ValueMatrix,
421-
&pb.ValueList{Values: []*pb.TaskValue{}})
422-
if q.ExpandAll {
423-
// To keep the cardinality same as that of ValueMatrix.
424-
out.LangMatrix = append(out.LangMatrix, &pb.LangList{})
443+
// If count is being requested, there is no need to populate value and facets matrix.
444+
if q.DoCount {
445+
count, err := countForValuePostings(args, pl, facetsTree, listType)
446+
if err != nil && err != posting.ErrNoValue {
447+
return err
448+
}
449+
out.Counts = append(out.Counts, uint32(count))
450+
// Add an empty UID list to make later processing consistent.
451+
out.UidMatrix = append(out.UidMatrix, &pb.List{})
452+
continue
425453
}
426-
continue
427-
case err != nil:
428-
return err
429-
}
430454

431-
if q.ExpandAll {
432-
langTags, err := pl.GetLangTags(args.q.ReadTs)
433-
if err != nil {
455+
vals, fcs, err = retrieveValuesAndFacets(args, pl, facetsTree, listType)
456+
457+
switch {
458+
case err == posting.ErrNoValue || (err == nil && len(vals) == 0):
459+
// This branch is taken when the value does not exist in the pl or
460+
// the number of values retrieved is zero (there could still be facets).
461+
// We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and
462+
// LangMatrix so that all these data structure have predictable layouts.
463+
out.UidMatrix = append(out.UidMatrix, &pb.List{})
464+
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
465+
out.ValueMatrix = append(out.ValueMatrix,
466+
&pb.ValueList{Values: []*pb.TaskValue{}})
467+
if q.ExpandAll {
468+
// To keep the cardinality same as that of ValueMatrix.
469+
out.LangMatrix = append(out.LangMatrix, &pb.LangList{})
470+
}
471+
continue
472+
case err != nil:
434473
return err
435474
}
436-
out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags})
475+
476+
if q.ExpandAll {
477+
langTags, err := pl.GetLangTags(args.q.ReadTs)
478+
if err != nil {
479+
return err
480+
}
481+
out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags})
482+
}
437483
}
438484

439485
uidList := new(pb.List)

0 commit comments

Comments
 (0)