Skip to content

Commit

Permalink
Add verification of replication tasks in force replication (#4630)
Browse files Browse the repository at this point in the history
<!-- Describe what has changed in this PR -->
**What changed?**
Add a verification step to check if generated workflow executions exist
on target cluster.

<!-- Tell your future self why have you made these changes -->
**Why?**
To ensure all generated replication tasks have been successfully applied
on target cluster.

<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
**How did you test it?**
Unit tests + cluster tests

<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
**Potential risks**


<!-- Is this PR a hotfix candidate or require that a notification be
sent to the broader community? (Yes/No) -->
**Is hotfix candidate?**
No
  • Loading branch information
hehaifengcn committed Jul 21, 2023
1 parent 1d5601a commit 6247d88
Show file tree
Hide file tree
Showing 7 changed files with 651 additions and 59 deletions.
6 changes: 6 additions & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1655,6 +1655,12 @@ var (
ScheduleCancelWorkflowErrors = NewCounterDef("schedule_cancel_workflow_errors")
ScheduleTerminateWorkflowErrors = NewCounterDef("schedule_terminate_workflow_errors")

// Force replication
EncounterZombieWorkflowCount = NewCounterDef("encounter_zombie_workflow_count")
CreateReplicationTasksLatency = NewTimerDef("create_replication_tasks_latency")
VerifyReplicationTaskSuccess = NewCounterDef("verify_replication_task_success")
VerifyReplicationTasksLatency = NewTimerDef("verify_replication_tasks_latency")

// Replication
NamespaceReplicationTaskAckLevelGauge = NewGaugeDef("namespace_replication_task_ack_level")
NamespaceReplicationDLQAckLevelGauge = NewGaugeDef("namespace_dlq_ack_level")
Expand Down
293 changes: 276 additions & 17 deletions service/worker/migration/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ import (
"go.temporal.io/sdk/activity"

"go.temporal.io/sdk/temporal"
"go.temporal.io/server/api/adminservice/v1"
enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/historyservice/v1"
replicationspb "go.temporal.io/server/api/replication/v1"
"go.temporal.io/server/client/admin"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log/tag"
Expand All @@ -51,6 +53,65 @@ import (
"go.temporal.io/server/common/util"
)

type (
VerifyStatus int
VerifyResult struct {
Status VerifyStatus
Reason string
}

replicationTasksHeartbeatDetails struct {
Results []VerifyResult
CheckPoint time.Time
LastNotFoundWorkflowExecution commonpb.WorkflowExecution
}

verifyReplicationTasksTimeoutErr struct {
timeout time.Duration
details replicationTasksHeartbeatDetails
}
)

// State Diagram
//
// NOT_CREATED
// │
// │
// CREATED_TO_BE_VERIFIED
// │
// ┌────────┴─────────┐
// │ │
// VERIFIED VERIFIED_SKIPPED
const (
NOT_CREATED VerifyStatus = 0
CREATED_TO_BE_VERIFIED VerifyStatus = 1
VERIFIED VerifyStatus = 2
VERIFY_SKIPPED VerifyStatus = 3

reasonZombieWorkflow = "Zombie workflow"
reasonWorkflowNotFound = "Workflow not found"
)

func (r VerifyResult) isNotCreated() bool {
return r.Status == NOT_CREATED
}

func (r VerifyResult) isCreatedToBeVerified() bool {
return r.Status == CREATED_TO_BE_VERIFIED
}

func (r VerifyResult) isVerified() bool {
return r.Status == VERIFIED
}

func (r VerifyResult) isSkipped() bool {
return r.Status == VERIFY_SKIPPED
}

func (r VerifyResult) isCompleted() bool {
return r.isVerified() || r.isSkipped()
}

// TODO: CallerTypePreemptablee should be set in activity background context for all migration activities.
// However, activity background context is per-worker, which means once set, all activities processed by the
// worker will use CallerTypePreemptable, including those not related to migration. This is not ideal.
Expand Down Expand Up @@ -275,20 +336,18 @@ func (a *activities) generateWorkflowReplicationTask(ctx context.Context, rateLi
},
})

switch err.(type) {
case nil:
stateTransitionCount := resp.StateTransitionCount
for stateTransitionCount > 0 {
token := util.Min(int(stateTransitionCount), rateLimiter.Burst())
stateTransitionCount -= int64(token)
_ = rateLimiter.ReserveN(time.Now(), token)
}
return nil
case *serviceerror.NotFound:
return nil
default:
if err != nil {
return err
}

stateTransitionCount := resp.StateTransitionCount
for stateTransitionCount > 0 {
token := util.Min(int(stateTransitionCount), rateLimiter.Burst())
stateTransitionCount -= int64(token)
_ = rateLimiter.ReserveN(time.Now(), token)
}

return nil
}

func (a *activities) UpdateNamespaceState(ctx context.Context, req updateStateRequest) error {
Expand Down Expand Up @@ -379,10 +438,11 @@ func (a *activities) GenerateReplicationTasks(ctx context.Context, request *gene

for i := startIndex; i < len(request.Executions); i++ {
we := request.Executions[i]
err := a.generateWorkflowReplicationTask(ctx, rateLimiter, definition.NewWorkflowKey(request.NamespaceID, we.WorkflowId, we.RunId))
if err != nil {
a.logger.Info("Force replicate failed", tag.WorkflowNamespaceID(request.NamespaceID), tag.WorkflowID(we.WorkflowId), tag.WorkflowRunID(we.RunId), tag.Error(err))
return err
if err := a.generateWorkflowReplicationTask(ctx, rateLimiter, definition.NewWorkflowKey(request.NamespaceID, we.WorkflowId, we.RunId)); err != nil {
if _, isNotFound := err.(*serviceerror.NotFound); !isNotFound {
a.logger.Error("force-replication failed to generate replication task", tag.WorkflowNamespaceID(request.NamespaceID), tag.WorkflowID(we.WorkflowId), tag.WorkflowRunID(we.RunId), tag.Error(err))
return err
}
}
activity.RecordHeartbeat(ctx, i)
}
Expand All @@ -394,7 +454,6 @@ func (a *activities) setCallerInfoForGenReplicationTask(
ctx context.Context,
namespaceID namespace.ID,
) context.Context {

nsName, err := a.namespaceRegistry.GetNamespaceName(namespaceID)
if err != nil {
a.logger.Error("Failed to get namespace name when generating replication task",
Expand Down Expand Up @@ -482,3 +541,203 @@ func (a *activities) SeedReplicationQueueWithUserDataEntries(ctx context.Context
activity.RecordHeartbeat(ctx, heartbeatDetails)
}
}

func (a *activities) createReplicationTasks(ctx context.Context, request *genearteAndVerifyReplicationTasksRequest, detail *replicationTasksHeartbeatDetails) error {
start := time.Now()
defer func() {
a.forceReplicationMetricsHandler.Timer(metrics.CreateReplicationTasksLatency.GetMetricName()).Record(time.Since(start))
}()

rateLimiter := quotas.NewRateLimiter(request.RPS, int(math.Ceil(request.RPS)))

for i := 0; i < len(request.Executions); i++ {
r := &detail.Results[i]
if r.isCompleted() {
continue
}

we := request.Executions[i]
tags := []tag.Tag{tag.WorkflowType(forceReplicationWorkflowName), tag.WorkflowNamespaceID(request.NamespaceID), tag.WorkflowID(we.WorkflowId), tag.WorkflowRunID(we.RunId)}

resp, err := a.historyClient.DescribeMutableState(ctx, &historyservice.DescribeMutableStateRequest{
NamespaceId: request.NamespaceID,
Execution: &we,
})

switch err.(type) {
case nil:
if resp.GetDatabaseMutableState().GetExecutionState().GetState() == enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE {
a.forceReplicationMetricsHandler.Counter(metrics.EncounterZombieWorkflowCount.GetMetricName()).Record(1)
a.logger.Info("createReplicationTasks skip Zombie workflow", tags...)

r.Status = VERIFY_SKIPPED
r.Reason = reasonZombieWorkflow
continue
}

// Only create replication task if it hasn't been already created
if r.isNotCreated() {
err := a.generateWorkflowReplicationTask(ctx, rateLimiter, definition.NewWorkflowKey(request.NamespaceID, we.WorkflowId, we.RunId))

switch err.(type) {
case nil:
r.Status = CREATED_TO_BE_VERIFIED
case *serviceerror.NotFound:
// rare case but in case if execution was deleted after above DescribeMutableState
r.Status = VERIFY_SKIPPED
r.Reason = reasonWorkflowNotFound
default:
a.logger.Error(fmt.Sprintf("createReplicationTasks failed to generate replication task. Error: %v", err), tags...)
return err
}
}

case *serviceerror.NotFound:
r.Status = VERIFY_SKIPPED
r.Reason = reasonWorkflowNotFound

default:
return err
}
}

return nil
}

func (a *activities) verifyReplicationTasks(
ctx context.Context,
request *genearteAndVerifyReplicationTasksRequest,
detail *replicationTasksHeartbeatDetails,
remoteClient adminservice.AdminServiceClient,
) (verified bool, progress bool, err error) {
start := time.Now()
defer func() {
a.forceReplicationMetricsHandler.Timer(metrics.VerifyReplicationTasksLatency.GetMetricName()).Record(time.Since(start))
}()

progress = false
for i := 0; i < len(request.Executions); i++ {
r := &detail.Results[i]
we := request.Executions[i]
if r.isNotCreated() {
// invalid state
return false, progress, temporal.NewNonRetryableApplicationError(fmt.Sprintf("verifyReplicationTasks: replication task for %v was not created", we), "", nil)
}

if r.isCompleted() {
continue
}

// Check if execution exists on remote cluster
_, err := remoteClient.DescribeMutableState(ctx, &adminservice.DescribeMutableStateRequest{
Namespace: request.Namespace,
Execution: &we,
})

switch err.(type) {
case nil:
a.forceReplicationMetricsHandler.Counter(metrics.VerifyReplicationTaskSuccess.GetMetricName()).Record(1)
r.Status = VERIFIED
progress = true

case *serviceerror.NotFound:
detail.LastNotFoundWorkflowExecution = we
return false, progress, nil

default:
return false, progress, err
}
}

return true, progress, nil
}

func (e verifyReplicationTasksTimeoutErr) Error() string {
return fmt.Sprintf("verifyReplicationTasks was not able to make progress for more than %v minutes (retryable). Not found WorkflowExecution: %v,",
e.timeout,
e.details.LastNotFoundWorkflowExecution,
)
}

const (
defaultNoProgressRetryableTimeout = 5 * time.Minute
defaultNoProgressNotRetryableTimeout = 15 * time.Minute
)

func (a *activities) GenerateAndVerifyReplicationTasks(ctx context.Context, request *genearteAndVerifyReplicationTasksRequest) error {
ctx = headers.SetCallerInfo(ctx, headers.NewPreemptableCallerInfo(request.Namespace))
remoteClient := a.clientFactory.NewRemoteAdminClientWithTimeout(
request.TargetClusterEndpoint,
admin.DefaultTimeout,
admin.DefaultLargeTimeout,
)

var details replicationTasksHeartbeatDetails
if activity.HasHeartbeatDetails(ctx) {
if err := activity.GetHeartbeatDetails(ctx, &details); err != nil {
return err
}
} else {
details.Results = make([]VerifyResult, len(request.Executions))
details.CheckPoint = time.Now()
activity.RecordHeartbeat(ctx, details)
}

if err := a.createReplicationTasks(ctx, request, &details); err != nil {
return err
}

activity.RecordHeartbeat(ctx, details)

// Verify if replication tasks exist on target cluster. There are several cases where execution was not found on target cluster.
// 1. replication lag
// 2. Zombie workflow execution
// 3. workflow execution was deleted (due to retention) after replication task was created
// 4. workflow execution was not applied succesfully on target cluster (i.e, bug)
//
// The verification step is retried for every VerifyInterval to handle #1. Verification progress
// is recorded in activity heartbeat. The verification is considered of making progress if there was at least one new execution
// being verified. If no progress is made for long enough, then
// - more than RetryableTimeout, the activity fails with retryable error and activity will restart. This gives us the chance
// to identify case #2 and #3 by rerunning createReplicationTasks.
// - more than NonRetryableTimeout, it means potentially we encountered #4. The activity returns
// non-retryable error and force-replication workflow will restarted.
for {
var verified, progress bool
var err error

if verified, progress, err = a.verifyReplicationTasks(ctx, request, &details, remoteClient); err != nil {
return err
}

if progress {
details.CheckPoint = time.Now()
}

activity.RecordHeartbeat(ctx, details)

if verified == true {
return nil
}

diff := time.Now().Sub(details.CheckPoint)
if diff > defaultNoProgressRetryableTimeout {
if diff > defaultNoProgressNotRetryableTimeout {
// Potentially encountered a missing execution, return non-retryable error
return temporal.NewNonRetryableApplicationError(
fmt.Sprintf("verifyReplicationTasks was not able to make progress for more than %v minutes (not retryable). Not found WorkflowExecution: %v, Checkpoint: %v",
diff.Minutes(),
details.LastNotFoundWorkflowExecution, details.CheckPoint),
"", nil)
}

// return error to trigger activity retry
return verifyReplicationTasksTimeoutErr{
timeout: diff,
details: details,
}
}

time.Sleep(request.VerifyInterval)
}
}
Loading

0 comments on commit 6247d88

Please sign in to comment.