Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-4203 pass RevSeqNo for on demand imports #7273

Open
wants to merge 11 commits into
base: release/anemone
Choose a base branch
from
59 changes: 45 additions & 14 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ func realDocID(docid string) string {
return docid
}

// getRevSeqNo fetches the revSeqNo for a document, using the virtual xattr if available. Returns the cas from this fetch.
func (c *DatabaseCollection) getRevSeqNo(ctx context.Context, docID string) (revSeqNo, cas uint64, err error) {
xattrs, cas, err := c.dataStore.GetXattrs(ctx, docID, []string{base.VirtualXattrRevSeqNo})
if err != nil {
return 0, 0, err
}
revSeqNo, err = unmarshalRevSeqNo(xattrs[base.VirtualXattrRevSeqNo])
return revSeqNo, cas, err
}

func (c *DatabaseCollection) GetDocument(ctx context.Context, docid string, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) {
doc, _, err = c.GetDocumentWithRaw(ctx, docid, unmarshalLevel)
return doc, err
Expand All @@ -64,22 +74,29 @@ func (c *DatabaseCollection) GetDocumentWithRaw(ctx context.Context, docid strin
if err != nil {
return nil, nil, err
}

isSgWrite, crc32Match, _ := doc.IsSGWrite(ctx, rawBucketDoc.Body)
if crc32Match {
c.dbStats().Database().Crc32MatchCount.Add(1)
}

// If existing doc wasn't an SG Write, import the doc.
if !isSgWrite {
var importErr error
doc, importErr = c.OnDemandImportForGet(ctx, docid, rawBucketDoc.Body, rawBucketDoc.Xattrs, rawBucketDoc.Cas)
if importErr != nil {
return nil, nil, importErr
// reload to get revseqno for on-demand import
doc, rawBucketDoc, err = c.getDocWithXattrs(ctx, key, append(c.syncGlobalSyncAndUserXattrKeys(), base.VirtualXattrRevSeqNo), unmarshalLevel)
if err != nil {
return nil, nil, err
}
// nil, nil returned when ErrImportCancelled is swallowed by importDoc switch
if doc == nil {
return nil, nil, base.ErrNotFound
isSgWrite, _, _ := doc.IsSGWrite(ctx, rawBucketDoc.Body)
if !isSgWrite {
var importErr error
doc, importErr = c.OnDemandImportForGet(ctx, docid, rawBucketDoc.Body, doc.RevSeqNo, rawBucketDoc.Xattrs, rawBucketDoc.Cas)
if importErr != nil {
return nil, nil, importErr
}
// nil, nil returned when ErrImportCancelled is swallowed by importDoc switch
if doc == nil {
return nil, nil, base.ErrNotFound
}
}
}
if !doc.HasValidSyncData() {
Expand Down Expand Up @@ -114,10 +131,16 @@ func (c *DatabaseCollection) GetDocumentWithRaw(ctx context.Context, docid strin
return doc, rawBucketDoc, nil
}

// GetDocWithXattrs retrieves a document from the bucket, including sync gateway metadta xattrs, and the user xattr, if specified.
func (c *DatabaseCollection) GetDocWithXattrs(ctx context.Context, key string, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, rawBucketDoc *sgbucket.BucketDocument, err error) {
return c.getDocWithXattrs(ctx, key, c.syncGlobalSyncAndUserXattrKeys(), unmarshalLevel)
}

// GetDocWithXattrs retrieves a document from the bucket, including sync gateway metadta xattrs, and the user xattr, if specified. Arbitrary xattrs can be passed into this function to allow VirtualXattrRevSeqNo to be returned and set on Document.
func (c *DatabaseCollection) getDocWithXattrs(ctx context.Context, key string, xattrKeys []string, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, rawBucketDoc *sgbucket.BucketDocument, err error) {
rawBucketDoc = &sgbucket.BucketDocument{}
var getErr error
rawBucketDoc.Body, rawBucketDoc.Xattrs, rawBucketDoc.Cas, getErr = c.dataStore.GetWithXattrs(ctx, key, c.syncGlobalSyncAndUserXattrKeys())
rawBucketDoc.Body, rawBucketDoc.Xattrs, rawBucketDoc.Cas, getErr = c.dataStore.GetWithXattrs(ctx, key, xattrKeys)
if getErr != nil {
return nil, nil, getErr
}
Expand Down Expand Up @@ -163,7 +186,7 @@ func (c *DatabaseCollection) GetDocSyncData(ctx context.Context, docid string) (
if !isSgWrite {
var importErr error

doc, importErr = c.OnDemandImportForGet(ctx, docid, rawDoc, xattrs, cas)
doc, importErr = c.OnDemandImportForGet(ctx, docid, rawDoc, doc.RevSeqNo, xattrs, cas)
if importErr != nil {
return emptySyncData, importErr
}
Expand Down Expand Up @@ -240,19 +263,18 @@ func (db *DatabaseCollection) GetDocSyncDataNoImport(ctx context.Context, docid

// OnDemandImportForGet. Attempts to import the doc based on the provided id, contents and cas. ImportDocRaw does cas retry handling
// if the document gets updated after the initial retrieval attempt that triggered this.
func (c *DatabaseCollection) OnDemandImportForGet(ctx context.Context, docid string, rawDoc []byte, xattrs map[string][]byte, cas uint64) (docOut *Document, err error) {
func (c *DatabaseCollection) OnDemandImportForGet(ctx context.Context, docid string, rawDoc []byte, revSeqNo uint64, xattrs map[string][]byte, cas uint64) (docOut *Document, err error) {
isDelete := rawDoc == nil
importDb := DatabaseCollectionWithUser{DatabaseCollection: c, user: nil}
var importErr error

importOpts := importDocOptions{
isDelete: isDelete,
mode: ImportOnDemand,
revSeqNo: 0, // pending work in CBG-4203
revSeqNo: revSeqNo,
expiry: nil,
}

// RevSeqNo is 0 here pending work in CBG-4203
docOut, importErr = importDb.ImportDocRaw(ctx, docid, rawDoc, xattrs, importOpts, cas)

if importErr == base.ErrImportCancelledFilter {
Expand Down Expand Up @@ -905,7 +927,15 @@ func (db *DatabaseCollectionWithUser) backupAncestorRevs(ctx context.Context, do

// ////// UPDATING DOCUMENTS:

// OnDemandImportForWrite imports a document before a subsequent document is written to Sync Gateway on top of the import document. Returns base.ErrCasFailureShouldRetry in the case that this import nees to be retried. This function is expected to be called within a callback to WriteUpdateWithXattrs.
func (db *DatabaseCollectionWithUser) OnDemandImportForWrite(ctx context.Context, docid string, doc *Document, deleted bool) error {
revSeqNo, cas, err := db.getRevSeqNo(ctx, docid)
if err != nil {
return err
}
if cas != doc.Cas {
return base.ErrCasFailureShouldRetry
}

// Check whether the doc requiring import is an SDK delete
isDelete := false
Expand All @@ -921,7 +951,7 @@ func (db *DatabaseCollectionWithUser) OnDemandImportForWrite(ctx context.Context
expiry: nil,
mode: ImportOnDemand,
isDelete: isDelete,
revSeqNo: 0, // pending work in CBG-4203
revSeqNo: revSeqNo,
}
importedDoc, importErr := importDb.ImportDoc(ctx, docid, doc, importOpts) // nolint:staticcheck

Expand Down Expand Up @@ -2405,6 +2435,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
if doc, err = db.unmarshalDocumentWithXattrs(ctx, docid, currentValue, currentXattrs, cas, DocUnmarshalAll); err != nil {
return
}

prevCurrentRev = doc.CurrentRev

// Check whether Sync Data originated in body
Expand Down
2 changes: 1 addition & 1 deletion db/database_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (c *DatabaseCollection) syncGlobalSyncAndUserXattrKeys() []string {
func (c *DatabaseCollection) syncGlobalSyncMouRevSeqNoAndUserXattrKeys() []string {
xattrKeys := []string{base.SyncXattrName, base.VvXattrName}
if c.useMou() {
xattrKeys = append(xattrKeys, base.MouXattrName, base.VirtualXattrRevSeqNo, base.GlobalXattrName)
xattrKeys = append(xattrKeys, base.MouXattrName, base.GlobalXattrName)
}
userXattrKey := c.userXattrKey()
if userXattrKey != "" {
Expand Down
45 changes: 30 additions & 15 deletions db/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,14 +430,19 @@ func unmarshalDocument(docid string, data []byte) (*Document, error) {
return doc, nil
}

func unmarshalDocumentWithXattrs(ctx context.Context, docid string, data, syncXattrData, hlvXattrData, mouXattrData, userXattrData, virtualXattr []byte, globalSyncData []byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) {

func unmarshalDocumentWithXattrs(ctx context.Context, docid string, data, syncXattrData, hlvXattrData, mouXattrData, userXattrData, revSeqNo []byte, globalSyncData []byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) {
if len(syncXattrData) == 0 && len(hlvXattrData) == 0 {
// If no xattr data, unmarshal as standard doc
doc, err = unmarshalDocument(docid, data)
if doc != nil {
doc.RevSeqNo, err = unmarshalRevSeqNo(revSeqNo)
if err != nil {
return nil, pkgerrors.WithStack(base.RedactErrorf("Failed convert rev seq number during UnmarshalWithXattrs() doc with id: %s. Error: %v", base.UD(doc.ID), err))
}
}
} else {
doc = NewDocument(docid)
err = doc.UnmarshalWithXattrs(ctx, data, syncXattrData, hlvXattrData, virtualXattr, globalSyncData, unmarshalLevel)
err = doc.UnmarshalWithXattrs(ctx, data, syncXattrData, hlvXattrData, revSeqNo, globalSyncData, unmarshalLevel)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -1117,7 +1122,7 @@ func (doc *Document) MarshalJSON() (data []byte, err error) {
// unmarshalLevel is anything less than the full document + metadata, the raw data is retained for subsequent
// lazy unmarshalling as needed.
// Must handle cases where document body and hlvXattrData are present without syncXattrData for all DocumentUnmarshalLevel
func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrData, hlvXattrData, virtualXattr []byte, globalSyncData []byte, unmarshalLevel DocumentUnmarshalLevel) error {
func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrData, hlvXattrData, revSeqNo []byte, globalSyncData []byte, unmarshalLevel DocumentUnmarshalLevel) error {
if doc.ID == "" {
base.WarnfCtx(ctx, "Attempted to unmarshal document without ID set")
return errors.New("Document was unmarshalled without ID set")
Expand All @@ -1140,18 +1145,11 @@ func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrDat
return pkgerrors.WithStack(base.RedactErrorf("Failed to unmarshal HLV during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalAll/Sync). Error: %v", base.UD(doc.ID), err))
}
}
if virtualXattr != nil {
var revSeqNo string
err := base.JSONUnmarshal(virtualXattr, &revSeqNo)
if revSeqNo != nil {
var err error
doc.RevSeqNo, err = unmarshalRevSeqNo(revSeqNo)
if err != nil {
return pkgerrors.WithStack(base.RedactErrorf("Failed to unmarshal doc virtual revSeqNo xattr during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalAll/Sync). Error: %v", base.UD(doc.ID), err))
}
if revSeqNo != "" {
revNo, err := strconv.ParseUint(revSeqNo, 10, 64)
if err != nil {
return pkgerrors.WithStack(base.RedactErrorf("Failed convert rev seq number %q during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalAll/Sync). Error: %v", revSeqNo, base.UD(doc.ID), err))
}
doc.RevSeqNo = revNo
return pkgerrors.WithStack(base.RedactErrorf("Failed to unmarshal RevSeqNo during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalAll/Sync). Error: %v", base.UD(doc.ID), err))
}
}
if len(globalSyncData) > 0 {
Expand Down Expand Up @@ -1385,3 +1383,20 @@ func (s *SyncData) GetRevAndVersion() (rav channels.RevAndVersion) {
}
return rav
}

// unmarshalRevSeqNo unmarshals the rev seq number from the provided bytes, expects a string representation of the uint64.
func unmarshalRevSeqNo(revSeqNoBytes []byte) (uint64, error) {
if len(revSeqNoBytes) == 0 {
return 0, nil
}
var revSeqNoString string
err := base.JSONUnmarshal(revSeqNoBytes, &revSeqNoString)
if err != nil {
return 0, fmt.Errorf("Failed to unmarshal rev seq number %s", revSeqNoBytes)
}
revSeqNo, err := strconv.ParseUint(revSeqNoString, 10, 64)
if err != nil {
return 0, fmt.Errorf("Failed convert rev seq number %s", revSeqNoBytes)
}
return revSeqNo, nil
}
95 changes: 65 additions & 30 deletions db/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,17 @@ func TestFeedImport(t *testing.T) {

// fetch the xattrs directly doc to confirm import (to avoid triggering on-demand import)
var syncData SyncData
xattrs, importCas, err := collection.dataStore.GetXattrs(ctx, key, []string{base.SyncXattrName})
xattrs, importCas, err := collection.dataStore.GetXattrs(ctx, key, []string{base.SyncXattrName, base.VirtualXattrRevSeqNo})
require.NoError(t, err)
syncXattr, ok := xattrs[base.SyncXattrName]
require.True(t, ok)
require.NoError(t, base.JSONUnmarshal(syncXattr, &syncData))
require.NotZero(t, syncData.Sequence, "Sequence should not be zero for imported doc")
revSeqNo := RetrieveDocRevSeqNo(t, xattrs[base.VirtualXattrRevSeqNo])
require.NotZero(t, revSeqNo, "RevSeqNo should not be zero for imported doc")

// verify mou
xattrs, _, err = collection.dataStore.GetXattrs(ctx, key, []string{base.MouXattrName})
// verify mou and rev seqno
xattrs, _, err = collection.dataStore.GetXattrs(ctx, key, []string{base.MouXattrName, base.VirtualXattrRevSeqNo})
if db.UseMou() {
var mou *MetadataOnlyUpdate
require.NoError(t, err)
Expand All @@ -71,6 +73,8 @@ func TestFeedImport(t *testing.T) {
require.NoError(t, base.JSONUnmarshal(mouXattr, &mou))
require.Equal(t, base.CasToString(writeCas), mou.PreviousHexCAS)
require.Equal(t, base.CasToString(importCas), mou.HexCAS)
// curr revSeqNo should be 2, so prev revSeqNo is 1
require.Equal(t, revSeqNo-1, mou.PreviousRevSeqNo)
} else {
// Expect not found fetching mou xattr
require.Error(t, err)
Expand Down Expand Up @@ -107,6 +111,7 @@ func TestOnDemandImportMou(t *testing.T) {
require.NotNil(t, doc.MetadataOnlyUpdate)
require.Equal(t, base.CasToString(writeCas), doc.MetadataOnlyUpdate.PreviousHexCAS)
require.Equal(t, base.CasToString(doc.Cas), doc.MetadataOnlyUpdate.HexCAS)
require.Equal(t, uint64(1), doc.MetadataOnlyUpdate.PreviousRevSeqNo)
} else {
require.Nil(t, doc.MetadataOnlyUpdate)
}
Expand All @@ -115,34 +120,64 @@ func TestOnDemandImportMou(t *testing.T) {
// On-demand write
// Create via the SDK
t.Run("on-demand write", func(t *testing.T) {
writeKey := baseKey + "write"
bodyBytes := []byte(`{"foo":"bar"}`)
body := Body{}
err := body.Unmarshal(bodyBytes)
assert.NoError(t, err, "Error unmarshalling body")
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
writeCas, err := collection.dataStore.WriteCas(writeKey, 0, 0, bodyBytes, 0)
require.NoError(t, err)

// Update the document to trigger on-demand import. Write will be a conflict, but import should be performed
_, doc, err := collection.Put(ctx, writeKey, Body{"foo": "baz"})
require.Nil(t, doc)
assertHTTPError(t, err, 409)
for _, funcName := range []string{"Put", "PutExistingRev", "PutExistingCurrentVersion"} {
t.Run(funcName, func(t *testing.T) {
writeKey := baseKey + "_" + funcName
bodyBytes := []byte(`{"foo":"bar"}`)
body := Body{}
err := body.Unmarshal(bodyBytes)
assert.NoError(t, err, "Error unmarshalling body")
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
writeCas, err := collection.dataStore.WriteCas(writeKey, 0, 0, bodyBytes, 0)
require.NoError(t, err)

newDoc := &Document{
ID: writeKey,
}
newDoc.UpdateBodyBytes([]byte(`{"foo": "baz"}`))

_, rawBucketDoc, err := collection.GetDocumentWithRaw(ctx, writeKey, DocUnmarshalSync)
require.NoError(t, err)

switch funcName {
case "Put":
// Update the document to trigger on-demand import. Write will be a conflict, but import should be performed
_, doc, err := collection.Put(ctx, writeKey, Body{"foo": "baz"})
require.Nil(t, doc)
assertHTTPError(t, err, 409)
case "PutExistingRev":
fakeRevID := "1-abc"
docHistory := []string{fakeRevID}
noConflicts := true
forceAllowConflictingTombstone := false
_, _, err := collection.PutExistingRev(ctx, newDoc, docHistory, noConflicts, forceAllowConflictingTombstone, rawBucketDoc, ExistingVersionWithUpdateToHLV)
assertHTTPError(t, err, 409)
case "PutExistingCurrentVersion":
hlv := NewHybridLogicalVector()
var legacyRevList []string
_, _, _, err = collection.PutExistingCurrentVersion(ctx, newDoc, hlv, rawBucketDoc, legacyRevList)
assertHTTPError(t, err, 409)
default:
require.FailNow(t, fmt.Sprintf("unexpected funcName: %s", funcName))
}

// fetch the mou xattr directly doc to confirm import (to avoid triggering on-demand get import)
// verify mou
xattrs, importCas, err := collection.dataStore.GetXattrs(ctx, writeKey, []string{base.MouXattrName})
if db.UseMou() {
require.NoError(t, err)
mouXattr, mouOk := xattrs[base.MouXattrName]
var mou *MetadataOnlyUpdate
require.True(t, mouOk)
require.NoError(t, base.JSONUnmarshal(mouXattr, &mou))
require.Equal(t, base.CasToString(writeCas), mou.PreviousHexCAS)
require.Equal(t, base.CasToString(importCas), mou.HexCAS)
} else {
// expect not found fetching mou xattr
require.Error(t, err)
// fetch the mou xattr directly doc to confirm import (to avoid triggering on-demand get import)
// verify mou
xattrs, importCas, err := collection.dataStore.GetXattrs(ctx, writeKey, []string{base.MouXattrName})
if db.UseMou() {
require.NoError(t, err)
mouXattr, mouOk := xattrs[base.MouXattrName]
var mou *MetadataOnlyUpdate
require.True(t, mouOk)
require.NoError(t, base.JSONUnmarshal(mouXattr, &mou))
require.Equal(t, base.CasToString(writeCas), mou.PreviousHexCAS)
require.Equal(t, base.CasToString(importCas), mou.HexCAS)
require.Equal(t, uint64(1), mou.PreviousRevSeqNo)
} else {
// expect not found fetching mou xattr
require.Error(t, err)
}
})
}
})

Expand Down
4 changes: 0 additions & 4 deletions db/util_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,10 +749,6 @@ func (c *DatabaseCollection) GetDocumentCurrentVersion(t testing.TB, key string)

// retrieveDocRevSeNo will take the $document xattr and return the revSeqNo defined in that xattr
func RetrieveDocRevSeqNo(t *testing.T, docxattr []byte) uint64 {
// virtual xattr not implemented for rosmar CBG-4233
if base.UnitTestUrlIsWalrus() {
return 0
}
require.NotNil(t, docxattr)
var retrievedDocumentRevNo string
require.NoError(t, base.JSONUnmarshal(docxattr, &retrievedDocumentRevNo))
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/couchbase/sg-bucket v0.0.0-20241018143914-45ef51a0c1be
github.com/couchbaselabs/go-fleecedelta v0.0.0-20220909152808-6d09efa7a338
github.com/couchbaselabs/gocbconnstr v1.0.5
github.com/couchbaselabs/rosmar v0.0.0-20241219222419-f9921fccab90
github.com/couchbaselabs/rosmar v0.0.0-20250110001838-ab0121bb9242
github.com/elastic/gosigar v0.14.3
github.com/felixge/fgprof v0.9.5
github.com/go-jose/go-jose/v4 v4.0.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ github.com/couchbaselabs/gocbconnstr v1.0.5 h1:e0JokB5qbcz7rfnxEhNRTKz8q1svoRvDo
github.com/couchbaselabs/gocbconnstr v1.0.5/go.mod h1:KV3fnIKMi8/AzX0O9zOrO9rofEqrRF1d2rG7qqjxC7o=
github.com/couchbaselabs/gocbconnstr/v2 v2.0.0-20240607131231-fb385523de28 h1:lhGOw8rNG6RAadmmaJAF3PJ7MNt7rFuWG7BHCYMgnGE=
github.com/couchbaselabs/gocbconnstr/v2 v2.0.0-20240607131231-fb385523de28/go.mod h1:o7T431UOfFVHDNvMBUmUxpHnhivwv7BziUao/nMl81E=
github.com/couchbaselabs/rosmar v0.0.0-20241219222419-f9921fccab90 h1:rQfOVEJvF8uGdRsWqNC4WZo5I6gOysFdE/1r0RyxEoE=
github.com/couchbaselabs/rosmar v0.0.0-20241219222419-f9921fccab90/go.mod h1:suZBurj14d2YtLOW8pBc8mjQN8MhPFHHgPbqX1fDDlE=
github.com/couchbaselabs/rosmar v0.0.0-20250110001838-ab0121bb9242 h1:Gb6jaSXG6KfJFT6zGmNaCK/Ljd+yVQIo7QifBhBF11Y=
github.com/couchbaselabs/rosmar v0.0.0-20250110001838-ab0121bb9242/go.mod h1:suZBurj14d2YtLOW8pBc8mjQN8MhPFHHgPbqX1fDDlE=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
Loading
Loading