diff --git a/regulation-worker/cmd/main.go b/regulation-worker/cmd/main.go index b57b524219..c54c04cd71 100644 --- a/regulation-worker/cmd/main.go +++ b/regulation-worker/cmd/main.go @@ -80,9 +80,10 @@ func Run(ctx context.Context) { FilesLimit: config.GetInt("REGULATION_WORKER_FILES_LIMIT", 1000), }, &api.APIManager{ - Client: &http.Client{Timeout: config.GetDuration("HttpClient.regulationWorker.transformer.timeout", 60, time.Second)}, - DestTransformURL: config.MustGetString("DEST_TRANSFORM_URL"), - OAuth: OAuth, + Client: &http.Client{Timeout: config.GetDuration("HttpClient.regulationWorker.transformer.timeout", 60, time.Second)}, + DestTransformURL: config.MustGetString("DEST_TRANSFORM_URL"), + OAuth: OAuth, + MaxOAuthRefreshRetryAttempts: config.GetInt("RegulationWorker.oauth.maxRefreshRetryAttempts", 1), }), } diff --git a/regulation-worker/internal/delete/api/api.go b/regulation-worker/internal/delete/api/api.go index 3074ec211e..f94a4dd4b0 100644 --- a/regulation-worker/internal/delete/api/api.go +++ b/regulation-worker/internal/delete/api/api.go @@ -27,9 +27,10 @@ var ( ) type APIManager struct { - Client *http.Client - DestTransformURL string - OAuth oauth.Authorizer + Client *http.Client + DestTransformURL string + OAuth oauth.Authorizer + MaxOAuthRefreshRetryAttempts int } type oauthDetail struct { @@ -41,9 +42,7 @@ func (*APIManager) GetSupportedDestinations() []string { return supportedDestinations } -// prepares payload based on (job,destDetail) & make an API call to transformer. -// gets (status, failure_reason) which is converted to appropriate model.Error & returned to caller. -func (api *APIManager) Delete(ctx context.Context, job model.Job, destination model.Destination) model.JobStatus { +func (api *APIManager) deleteWithRetry(ctx context.Context, job model.Job, destination model.Destination, currentOauthRetryAttempt int) model.JobStatus { pkgLogger.Debugf("deleting: %v", job, " from API destination: %v", destination.Name) method := http.MethodPost endpoint := "/deleteUsers" @@ -106,22 +105,28 @@ func (api *APIManager) Delete(ctx context.Context, job model.Job, destination mo return model.JobStatusFailed } jobStatus := getJobStatus(resp.StatusCode, jobResp) - pkgLogger.Debugf("[%v] JobStatus for %v: %v", destination.Name, destination.DestinationID, jobStatus) + pkgLogger.Debugf("[%v] Job: %v, JobStatus: %v", destination.Name, job.ID, jobStatus) - if isOAuthEnabled && isTokenExpired(jobResp) { + if isOAuthEnabled && isTokenExpired(jobResp) && currentOauthRetryAttempt < api.MaxOAuthRefreshRetryAttempts { err = api.refreshOAuthToken(destination.Name, job.WorkspaceID, oAuthDetail) if err != nil { pkgLogger.Error(err) return model.JobStatusFailed } // retry the request - pkgLogger.Debug("Retrying deleteRequest job for the whole batch") - return api.Delete(ctx, job, destination) + pkgLogger.Infof("[%v] Retrying deleteRequest job(id: %v) for the whole batch, RetryAttempt: %v", destination.Name, job.ID, currentOauthRetryAttempt+1) + return api.deleteWithRetry(ctx, job, destination, currentOauthRetryAttempt+1) } return jobStatus } +// prepares payload based on (job,destDetail) & make an API call to transformer. +// gets (status, failure_reason) which is converted to appropriate model.Error & returned to caller. +func (api *APIManager) Delete(ctx context.Context, job model.Job, destination model.Destination) model.JobStatus { + return api.deleteWithRetry(ctx, job, destination, 0) +} + func getJobStatus(statusCode int, jobResp []JobRespSchema) model.JobStatus { switch statusCode { diff --git a/services/oauth/oauth.go b/services/oauth/oauth.go index 94e243ee66..4677cbcecf 100644 --- a/services/oauth/oauth.go +++ b/services/oauth/oauth.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net/http" + "os" "strconv" "strings" "sync" @@ -318,7 +319,6 @@ func (authErrHandler *OAuthErrResHandler) fetchAccountInfoFromCp(refTokenParams authStats.statName = fmt.Sprintf(`%v_request_latency`, refTokenParams.EventNamePrefix) authStats.SendTimerStats(cpiCallStartTime) - authErrHandler.logger.Debugf("[%s] Got the response from Control-Plane: rt-worker-%d\n", loggerNm, refTokenParams.WorkerId) authErrHandler.logger.Debugf("[%s] Got the response from Control-Plane: rt-worker-%d with statusCode: %d\n", loggerNm, refTokenParams.WorkerId, statusCode) // Empty Refresh token response @@ -369,7 +369,7 @@ func (authErrHandler *OAuthErrResHandler) fetchAccountInfoFromCp(refTokenParams func getRefreshTokenErrResp(response string, accountSecret *AccountSecret) (message string) { if err := json.Unmarshal([]byte(response), &accountSecret); err != nil { // Some problem with AccountSecret unmarshalling - message = err.Error() + message = fmt.Sprintf("Unmarshal of response unsuccessful: %v", response) } else if gjson.Get(response, "body.code").String() == INVALID_REFRESH_TOKEN_GRANT { // User (or) AccessToken (or) RefreshToken has been revoked message = INVALID_REFRESH_TOKEN_GRANT @@ -515,6 +515,14 @@ func processResponse(resp *http.Response) (statusCode int, respBody string) { } func (authErrHandler *OAuthErrResHandler) cpApiCall(cpReq *ControlPlaneRequestT) (int, string) { + cpStatTags := stats.Tags{ + "url": cpReq.Url, + "requestType": cpReq.RequestType, + "destType": cpReq.destName, + "method": cpReq.Method, + "flowType": string(authErrHandler.rudderFlowType), + } + var reqBody *bytes.Buffer var req *http.Request var err error @@ -539,15 +547,14 @@ func (authErrHandler *OAuthErrResHandler) cpApiCall(cpReq *ControlPlaneRequestT) cpApiDoTimeStart := time.Now() res, doErr := authErrHandler.client.Do(req) - stats.Default.NewTaggedStat("cp_request_latency", stats.TimerType, stats.Tags{ - "url": cpReq.Url, - "destination": cpReq.destName, - "requestType": cpReq.RequestType, - }).SendTiming(time.Since(cpApiDoTimeStart)) + stats.Default.NewTaggedStat("cp_request_latency", stats.TimerType, cpStatTags).SendTiming(time.Since(cpApiDoTimeStart)) authErrHandler.logger.Debugf("[%s request] :: destination request sent\n", loggerNm) if doErr != nil { // Abort on receiving an error authErrHandler.logger.Errorf("[%s request] :: destination request failed: %+v\n", loggerNm, doErr) + if os.IsTimeout(doErr) { + stats.Default.NewTaggedStat("cp_request_timeout", stats.CountType, cpStatTags) + } return http.StatusBadRequest, doErr.Error() } if res.Body != nil {