Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.1.9 backport] CBG-4028 Treat on-demand import for GET errors as not found #6966

Merged
merged 1 commit into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,14 @@ func (c *DatabaseCollection) OnDemandImportForGet(ctx context.Context, docid str
var importErr error

docOut, importErr = importDb.ImportDocRaw(ctx, docid, rawDoc, rawXattr, rawUserXattr, isDelete, cas, nil, ImportOnDemand)

if importErr == base.ErrImportCancelledFilter {
// If the import was cancelled due to filter, treat as not found
return nil, base.HTTPErrorf(404, "Not imported")
// If the import was cancelled due to filter, treat as 404 not imported
return nil, base.HTTPErrorf(http.StatusNotFound, "Not imported")
} else if importErr != nil {
return nil, importErr
// Treat any other failure to perform an on-demand import as not found
base.DebugfCtx(ctx, base.KeyImport, "Unable to import doc %q during on demand import for get - will be treated as not found. Reason: %v", base.UD(docid), importErr)
return nil, base.HTTPErrorf(http.StatusNotFound, "Not found")
}
return docOut, nil
}
Expand Down
115 changes: 80 additions & 35 deletions rest/blip_api_crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2880,82 +2880,127 @@ func TestBlipRefreshUser(t *testing.T) {
require.NotContains(t, string(body), "Panic:")
}

// TestOnDemandImportBlipFailure turns off feed-based import to be able to trigger on-demand import
// during a blip pull replication, by:
// 1. Write a document that's accessible to the client, and force import of this doc
// 2. Update the document
// 3. Flush the rev cache
// 4. Perform a pull replication. Client will get a changes message for the initial revision of the document,
// then SGW will detect the document needs to be imported when the rev is requested. Should triggers norev handling
// in the case where the import was unsuccessful
func TestOnDemandImportBlipFailure(t *testing.T) {
if !base.TestUseXattrs() {
t.Skip("Test performs import, not valid for non-xattr mode")
}
base.SetUpTestLogging(t, base.LevelDebug, base.KeyHTTP, base.KeySync, base.KeySyncMsg)
rt := NewRestTester(t, &RestTesterConfig{PersistentConfig: true, GuestEnabled: true})

base.SetUpTestLogging(t, base.LevelDebug, base.KeyHTTP, base.KeySync, base.KeyCache, base.KeyChanges)

syncFn := `function(doc) {
if (doc.invalid) {
throw("invalid document")
}
channel(doc.channel)
}`

importFilter := `function(doc) {
if (doc.doNotImport) {
return false
}
return true
}`

rt := NewRestTester(t, &RestTesterConfig{
SyncFn: syncFn,
ImportFilter: importFilter,
AutoImport: base.BoolPtr(false),
})
defer rt.Close()
config := rt.NewDbConfig()
config.AutoImport = false
RequireStatus(t, rt.CreateDatabase("db", config), http.StatusCreated)

testCases := []struct {
name string
invalidBody []byte
channel string // used to avoid cross-traffic between tests
updatedBody []byte
}{

{
name: "_id property",
invalidBody: []byte(`{"_id": "doc1"}`),
channel: "a",
updatedBody: []byte(`{"_id": "doc1", "channel":"a"}`),
},
{
name: "_exp property",
invalidBody: []byte(`{"_exp": 1}`),
channel: "b",
updatedBody: []byte(`{"_exp": 1, "channel":"b"}}`),
},
{
name: "_rev property",
invalidBody: []byte(`{"_rev": "abc1"}`),
channel: "c",
updatedBody: []byte(`{"_rev": "abc1", "channel":"c"}}`),
},
{
name: "_revisions property",
invalidBody: []byte(`{"_revisions": {"start": 0, "ids": ["foo", "def]"}}`),
channel: "d",
updatedBody: []byte(`{"_revisions": {"start": 0, "ids": ["foo", "def]"}, "channel":"d"}`),
},

{
name: "_purged property",
invalidBody: []byte(`{"_purged": true}`),
channel: "e",
updatedBody: []byte(`{"_purged": true, "channel":"e"}`),
},
{
name: "invalid json",
invalidBody: []byte(``),
channel: "f",
updatedBody: []byte(``),
},
{
name: "rejected by sync function",
channel: "g",
updatedBody: []byte(`{"invalid": true, "channel":"g"}`),
},
{
name: "rejected by import filter",
channel: "h",
updatedBody: []byte(`{"doNotImport": true, "channel":"h"}`),
},
}
for i, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
docID := fmt.Sprintf("doc%d,", i)
markerDoc := fmt.Sprintf("markerDoc%d", i)
validBody := `{"foo":"bar"}`
markerBody := `{"prop":true}`
_ = rt.PutDoc(docID, validBody)
btc, err := NewBlipTesterClientOptsWithRT(t, rt, &BlipTesterClientOpts{
Username: "user",
Channels: []string{"*"},
})
require.NoError(t, err)
defer btc.Close()
require.NoError(t, btc.StartOneshotPull())
docID := fmt.Sprintf("doc%d_%s,", i, testCase.name)
markerDoc := fmt.Sprintf("markerDoc%d_%s", i, testCase.name)
validBody := fmt.Sprintf(`{"foo":"bar", "channel":%q}`, testCase.channel)
username := fmt.Sprintf("user_%d", i)

output, found := btc.WaitForDoc(docID)
require.True(t, found)
require.JSONEq(t, validBody, string(output))
revID := rt.PutDoc(docID, validBody)

err = rt.GetSingleDataStore().SetRaw(docID, 0, nil, testCase.invalidBody)
// Wait for initial revision to arrive over DCP before mutating
require.NoError(t, rt.WaitForPendingChanges())

// Issue a changes request for the channel before updating the document, to ensure the valid revision is
// resident in the channel cache (query results may be unreliable in the case of the 'invalid json' update)
RequireStatus(t, rt.SendAdminRequest("GET", "/{{.keyspace}}/_changes?filter=sync_gateway/bychannel&channels="+testCase.channel, ""), 200)

err := rt.GetSingleDataStore().SetRaw(docID, 0, nil, testCase.updatedBody)
require.NoError(t, err)

rt.PutDoc(markerDoc, markerBody)
markerDocBody := fmt.Sprintf(`{"channel":%q}`, testCase.channel)
_ = rt.PutDoc(markerDoc, markerDocBody)

rt.GetSingleTestDatabaseCollection().FlushRevisionCacheForTest()

btc2, err := NewBlipTesterClientOptsWithRT(t, rt, &BlipTesterClientOpts{
Username: "user",
Channels: []string{"*"},
Username: username,
Channels: []string{testCase.channel},
})
require.NoError(t, err)
defer btc2.Close()

require.NoError(t, btc.StartOneshotPull())
require.NoError(t, btc2.StartOneshotPull())

btc.WaitForDoc(markerDoc)
btc2.WaitForDoc(markerDoc)

// Validate that the latest client message for the requested doc/rev was a norev
msg, ok := btc2.SingleCollection().GetBlipRevMessage(docID, revID.Rev)
require.True(t, ok)
require.Equal(t, db.MessageNoRev, msg.Profile())
})
}
}
17 changes: 16 additions & 1 deletion rest/blip_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,23 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) {
}

btr.bt.blipContext.HandlerForProfile[db.MessageNoRev] = func(msg *blip.Message) {
// TODO: Support norev messages
btr.storeMessage(msg)

btcr := btc.getCollectionClientFromMessage(msg)

docID := msg.Properties[db.NorevMessageId]
revID := msg.Properties[db.NorevMessageRev]

btcr.docsLock.Lock()
defer btcr.docsLock.Unlock()

if _, ok := btcr.docs[docID]; ok {
bodyMessagePair := &BodyMessagePair{message: msg}
btcr.docs[docID][revID] = bodyMessagePair
} else {
bodyMessagePair := &BodyMessagePair{message: msg}
btcr.docs[docID] = map[string]*BodyMessagePair{revID: bodyMessagePair}
}
}

btr.bt.blipContext.DefaultHandler = func(msg *blip.Message) {
Expand Down
4 changes: 2 additions & 2 deletions rest/importtest/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1430,7 +1430,7 @@ func TestOnDemandWriteImportReplacingNullDoc(t *testing.T) {

// Attempt to get the doc via Sync Gateway, triggering a cancelled on-demand import of the null document
response := rt.SendAdminRequest(http.MethodGet, "/{{.keyspace}}/"+key, "")
rest.RequireStatus(t, response, http.StatusBadRequest) // import attempted with empty body
rest.RequireStatus(t, response, http.StatusNotFound) // import attempted with empty body

// Attempt to update the doc via Sync Gateway, triggering on-demand import of the null document - should ignore empty body error and proceed with write
mobileBody := make(map[string]interface{})
Expand Down Expand Up @@ -2351,7 +2351,7 @@ func TestImportInternalPropertiesHandling(t *testing.T) {
name: "_purged true",
importBody: map[string]interface{}{"_purged": true},
expectReject: true,
expectedStatusCode: base.IntPtr(200), // Import gets cancelled and returns 200 and blank body
expectedStatusCode: base.IntPtr(404), // Import gets cancelled and returns not found
},
{
name: "_removed",
Expand Down
46 changes: 44 additions & 2 deletions rest/utilities_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
type RestTesterConfig struct {
GuestEnabled bool // If this is true, Admin Party is in full effect
SyncFn string // put the sync() function source in here (optional)
ImportFilter string // put the import filter function source in here (optional)
DatabaseConfig *DatabaseConfig // Supports additional config options. BucketConfig, Name, Sync, Unsupported will be ignored (overridden)
MutateStartupConfig func(config *StartupConfig) // Function to mutate the startup configuration before the server context gets created. This overrides options the RT sets.
InitSyncSeq uint64 // If specified, initializes _sync:seq on bucket creation. Not supported when running against walrus
Expand All @@ -60,6 +61,7 @@ type RestTesterConfig struct {
leakyBucketConfig *base.LeakyBucketConfig // Set to create and use a leaky bucket on the RT and DB. A test bucket cannot be passed in if using this option.
adminInterface string // adminInterface overrides the default admin interface.
SgReplicateEnabled bool // SgReplicateManager disabled by default for RestTester
AutoImport *bool
HideProductInfo bool
AdminInterfaceAuthentication bool
metricsInterfaceAuthentication bool
Expand Down Expand Up @@ -303,11 +305,14 @@ func (rt *RestTester) Bucket() base.Bucket {
// If scopes is already set, assume the caller has a plan
if rt.DatabaseConfig.Scopes == nil {
// Configure non default collections by default
var syncFn *string
var syncFn, importFilter *string
if rt.SyncFn != "" {
syncFn = base.StringPtr(rt.SyncFn)
}
rt.DatabaseConfig.Scopes = GetCollectionsConfigWithSyncFn(rt.TB, testBucket, syncFn, rt.numCollections)
if rt.ImportFilter != "" {
importFilter = base.StringPtr(rt.ImportFilter)
}
rt.DatabaseConfig.Scopes = GetCollectionsConfigWithFiltering(rt.TB, testBucket, rt.numCollections, syncFn, importFilter)
}
}

Expand All @@ -333,6 +338,10 @@ func (rt *RestTester) Bucket() base.Bucket {

rt.DatabaseConfig.SGReplicateEnabled = base.BoolPtr(rt.RestTesterConfig.SgReplicateEnabled)

// Check for override of AutoImport in the rt config
if rt.AutoImport != nil {
rt.DatabaseConfig.AutoImport = *rt.AutoImport
}
autoImport, _ := rt.DatabaseConfig.AutoImportEnabled(ctx)
if rt.DatabaseConfig.ImportPartitions == nil && base.TestUseXattrs() && base.IsEnterpriseEdition() && autoImport {
// Speed up test setup - most tests don't need more than one partition given we only have one node
Expand Down Expand Up @@ -407,6 +416,39 @@ func GetCollectionsConfigWithSyncFn(t testing.TB, testBucket *base.TestBucket, s
return scopesConfig
}

// GetCollectionsConfigWithFiltering sets up a ScopesConfig from a TestBucket for use with non default collections. The sync function will be passed for all collections.
func GetCollectionsConfigWithFiltering(t testing.TB, testBucket *base.TestBucket, numCollections int, syncFn *string, importFilter *string) ScopesConfig {
// Get a datastore as provided by the test
stores := testBucket.GetNonDefaultDatastoreNames()
require.True(t, len(stores) >= numCollections, "Requested more collections %d than found on testBucket %d", numCollections, len(stores))
defaultCollectionConfig := &CollectionConfig{}
if syncFn != nil {
defaultCollectionConfig.SyncFn = syncFn
}
if importFilter != nil {
defaultCollectionConfig.ImportFilter = importFilter
}

scopesConfig := ScopesConfig{}
for i := 0; i < numCollections; i++ {
dataStoreName := stores[i]
if scopeConfig, ok := scopesConfig[dataStoreName.ScopeName()]; ok {
if _, ok := scopeConfig.Collections[dataStoreName.CollectionName()]; ok {
// already present
} else {
scopeConfig.Collections[dataStoreName.CollectionName()] = defaultCollectionConfig
}
} else {
scopesConfig[dataStoreName.ScopeName()] = ScopeConfig{
Collections: map[string]*CollectionConfig{
dataStoreName.CollectionName(): defaultCollectionConfig,
}}
}

}
return scopesConfig
}

// GetSingleDataStoreNamesFromScopes config returns a lexically sorted list of configured datastores.
func GetDataStoreNamesFromScopesConfig(config ScopesConfig) []sgbucket.DataStoreName {
var names []string
Expand Down
Loading