Skip to content

Commit

Permalink
feat: user suppressions adaptations for namespaces (#2604)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth authored Nov 1, 2022
1 parent 768be08 commit 5c26d1b
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 91 deletions.
4 changes: 4 additions & 0 deletions config/backend-config/backend_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,3 +591,7 @@ func (m *mockIdentifier) ID() string {
func (m *mockIdentifier) BasicAuth() (string, string) {
return m.token, ""
}

func (*mockIdentifier) Type() deployment.Type {
return deployment.Type(`mockType`)
}
2 changes: 1 addition & 1 deletion enterprise/suppress-user/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ package suppression

type NOOP struct{}

func (*NOOP) IsSuppressedUser(_, _ string) bool {
func (*NOOP) IsSuppressedUser(_, _, _ string) bool {
return false
}
4 changes: 1 addition & 3 deletions enterprise/suppress-user/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,9 @@ func (m *Factory) Setup(backendConfig backendconfig.BackendConfig) (types.UserSu
loadConfig()
ctx := context.TODO()
backendConfig.WaitForConfig(ctx)
workspaceId := backendConfig.Identity().ID()
suppressUser := &SuppressRegulationHandler{
RegulationBackendURL: configBackendURL,
RegulationsPollInterval: regulationsPollInterval,
WorkspaceID: workspaceId,
ID: backendConfig.Identity(),
pageSize: strconv.Itoa(suppressionApiPageSize),
}
suppressUser.setup(ctx)
Expand Down
82 changes: 51 additions & 31 deletions enterprise/suppress-user/suppressUser.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (

"github.com/cenkalti/backoff"
"github.com/rudderlabs/rudder-server/config"
"github.com/rudderlabs/rudder-server/services/controlplane/identity"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types/deployment"

"github.com/rudderlabs/rudder-server/utils/logger"

Expand All @@ -26,8 +28,8 @@ type SuppressRegulationHandler struct {
Client *http.Client
RegulationBackendURL string
RegulationsPollInterval time.Duration
WorkspaceID string
userSpecificSuppressedSourceMap map[string]sourceFilter
ID identity.Identifier
userSpecificSuppressedSourceMap map[string]map[string]sourceFilter
regulationsSubscriberLock sync.RWMutex
suppressAPIToken string
pageSize string
Expand All @@ -47,29 +49,41 @@ type apiResponse struct {
}

type sourceRegulation struct {
Canceled bool `json:"canceled"`
UserID string `json:"userId"`
SourceIDs []string `json:"sourceIds"`
Canceled bool `json:"canceled"`
WorkspaceID string `json:"workspaceId"`
UserID string `json:"userId"`
SourceIDs []string `json:"sourceIds"`
}

func (suppressUser *SuppressRegulationHandler) setup(ctx context.Context) {
suppressUser.RegulationBackendURL = configBackendURL
switch suppressUser.ID.Type() {
case deployment.DedicatedType:
suppressUser.RegulationBackendURL += fmt.Sprintf("/dataplane/workspaces/%s/regulations/suppressions", suppressUser.ID.ID())
case deployment.MultiTenantType:
suppressUser.RegulationBackendURL += fmt.Sprintf("/dataplane/namespaces/%s/regulations/suppressions", suppressUser.ID.ID())
default:
panic("invalid deployment type")
}
rruntime.Go(func() {
suppressUser.regulationSyncLoop(ctx)
})
}

func (suppressUser *SuppressRegulationHandler) IsSuppressedUser(userID, sourceID string) bool {
func (suppressUser *SuppressRegulationHandler) IsSuppressedUser(workspaceID, userID, sourceID string) bool {
suppressUser.init()
pkgLogger.Debugf("IsSuppressedUser called for %v, %v", sourceID, userID)
pkgLogger.Debugf("IsSuppressedUser called for %v, %v, %v", workspaceID, sourceID, userID)
suppressUser.regulationsSubscriberLock.RLock()
defer suppressUser.regulationsSubscriberLock.RUnlock()
if _, ok := suppressUser.userSpecificSuppressedSourceMap[userID]; ok {
m := suppressUser.userSpecificSuppressedSourceMap[userID]
if m.all {
return true
}
if _, ok := m.specific[sourceID]; ok {
return true
if _, ok := suppressUser.userSpecificSuppressedSourceMap[workspaceID]; ok {
if _, ok := suppressUser.userSpecificSuppressedSourceMap[workspaceID][userID]; ok {
m := suppressUser.userSpecificSuppressedSourceMap[workspaceID][userID]
if m.all {
return true
}
if _, ok := m.specific[sourceID]; ok {
return true
}
}
}
return false
Expand Down Expand Up @@ -99,38 +113,43 @@ func (suppressUser *SuppressRegulationHandler) regulationSyncLoop(ctx context.Co
suppressUser.regulationsSubscriberLock.Lock()
for _, sourceRegulation := range regulations {
userId := sourceRegulation.UserID
workspaceID := sourceRegulation.WorkspaceID
_, ok := suppressUser.userSpecificSuppressedSourceMap[workspaceID]
if !ok {
suppressUser.userSpecificSuppressedSourceMap[workspaceID] = make(map[string]sourceFilter)
}
if len(sourceRegulation.SourceIDs) == 0 {
if _, ok := suppressUser.userSpecificSuppressedSourceMap[userId]; !ok {
if _, ok := suppressUser.userSpecificSuppressedSourceMap[workspaceID][userId]; !ok {
if !sourceRegulation.Canceled {
m := sourceFilter{
all: true,
specific: map[string]struct{}{},
}
suppressUser.userSpecificSuppressedSourceMap[userId] = m
suppressUser.userSpecificSuppressedSourceMap[workspaceID][userId] = m
continue
}
}
m := suppressUser.userSpecificSuppressedSourceMap[userId]
m := suppressUser.userSpecificSuppressedSourceMap[workspaceID][userId]
if sourceRegulation.Canceled {
m.all = false
} else {
m.all = true
}
suppressUser.userSpecificSuppressedSourceMap[userId] = m
suppressUser.userSpecificSuppressedSourceMap[workspaceID][userId] = m
} else {
if _, ok := suppressUser.userSpecificSuppressedSourceMap[userId]; !ok {
if _, ok := suppressUser.userSpecificSuppressedSourceMap[workspaceID][userId]; !ok {
if !sourceRegulation.Canceled {
m := sourceFilter{
specific: map[string]struct{}{},
}
for _, srcId := range sourceRegulation.SourceIDs {
m.specific[srcId] = struct{}{}
}
suppressUser.userSpecificSuppressedSourceMap[userId] = m
suppressUser.userSpecificSuppressedSourceMap[workspaceID][userId] = m
continue
}
}
m := suppressUser.userSpecificSuppressedSourceMap[userId]
m := suppressUser.userSpecificSuppressedSourceMap[workspaceID][userId]
if sourceRegulation.Canceled {
for _, srcId := range sourceRegulation.SourceIDs {
delete(m.specific, srcId) // will be no-op if key is not there in map
Expand All @@ -140,7 +159,7 @@ func (suppressUser *SuppressRegulationHandler) regulationSyncLoop(ctx context.Co
m.specific[srcId] = struct{}{}
}
}
suppressUser.userSpecificSuppressedSourceMap[userId] = m
suppressUser.userSpecificSuppressedSourceMap[workspaceID][userId] = m
}
}
suppressUser.regulationsSubscriberLock.Unlock()
Expand All @@ -152,12 +171,7 @@ func (suppressUser *SuppressRegulationHandler) regulationSyncLoop(ctx context.Co
}

func (suppressUser *SuppressRegulationHandler) getSourceRegulationsFromRegulationService() ([]sourceRegulation, error) {
if config.GetBool("HOSTED_SERVICE", false) {
pkgLogger.Info("[Regulations] Regulations on free tier are not supported at the moment.")
return []sourceRegulation{}, nil
}

urlStr := fmt.Sprintf("%s/dataplane/workspaces/%s/regulations/suppressions", suppressUser.RegulationBackendURL, suppressUser.WorkspaceID)
urlStr := suppressUser.RegulationBackendURL
urlValQuery := url.Values{}
if suppressUser.pageSize != "" {
urlValQuery.Set("pageSize", suppressUser.pageSize)
Expand All @@ -179,8 +193,7 @@ func (suppressUser *SuppressRegulationHandler) getSourceRegulationsFromRegulatio
if err != nil {
return err
}
workspaceToken := config.GetWorkspaceToken()
req.SetBasicAuth(workspaceToken, "")
req.SetBasicAuth(suppressUser.ID.BasicAuth())
req.Header.Set("Content-Type", "application/json")

resp, err = suppressUser.Client.Do(req)
Expand Down Expand Up @@ -228,6 +241,13 @@ func (suppressUser *SuppressRegulationHandler) getSourceRegulationsFromRegulatio
pkgLogger.Error("Error while parsing request: ", err, resp.StatusCode)
return []sourceRegulation{}, err
}
// TODO: remove this once regulation Service is updated
for i := range sourceRegulationsJSON.SourceRegulations {
sourceRegulation := &sourceRegulationsJSON.SourceRegulations[i]
if sourceRegulation.WorkspaceID == "" {
sourceRegulation.WorkspaceID = suppressUser.ID.ID()
}
}

if sourceRegulationsJSON.Token == "" {
pkgLogger.Errorf("[[ Workspace-config ]] No token found in the source regulations response: %v", string(respBody))
Expand All @@ -241,7 +261,7 @@ func (suppressUser *SuppressRegulationHandler) init() {
suppressUser.once.Do(func() {
pkgLogger.Info("init Regulations")
if len(suppressUser.userSpecificSuppressedSourceMap) == 0 {
suppressUser.userSpecificSuppressedSourceMap = map[string]sourceFilter{}
suppressUser.userSpecificSuppressedSourceMap = map[string]map[string]sourceFilter{}
}
if suppressUser.Client == nil {
suppressUser.Client = &http.Client{Timeout: config.GetDuration("HttpClient.suppressUser.timeout", 30, time.Second)}
Expand Down
Loading

0 comments on commit 5c26d1b

Please sign in to comment.