From b47a62f04f605bb528bb92124fd188e6becf5c51 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Mon, 8 Aug 2022 20:00:15 -0700 Subject: [PATCH] Use a prefix for scheduler workflow ids --- service/frontend/workflowHandler.go | 37 +++++++++++++++++++--------- service/worker/scheduler/workflow.go | 3 +++ 2 files changed, 28 insertions(+), 12 deletions(-) diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index dbad26cc31a..4ff4ce000c3 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -28,6 +28,7 @@ import ( "context" "encoding/binary" "fmt" + "strings" "sync/atomic" "time" "unicode/utf8" @@ -3016,8 +3017,9 @@ func (wh *WorkflowHandler) CreateSchedule(ctx context.Context, request *workflow return nil, errSchedulesNotAllowed } - // a schedule id is a workflow id so validate it the same way - if err := wh.validateWorkflowID(request.ScheduleId); err != nil { + workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId + + if err := wh.validateWorkflowID(workflowID); err != nil { return nil, err } @@ -3107,7 +3109,7 @@ func (wh *WorkflowHandler) CreateSchedule(ctx context.Context, request *workflow // Create StartWorkflowExecutionRequest startReq := &workflowservice.StartWorkflowExecutionRequest{ Namespace: request.Namespace, - WorkflowId: request.ScheduleId, + WorkflowId: workflowID, WorkflowType: &commonpb.WorkflowType{Name: scheduler.WorkflowType}, TaskQueue: &taskqueuepb.TaskQueue{Name: workercommon.PerNSWorkerTaskQueue}, Input: inputPayload, @@ -3154,7 +3156,8 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl return nil, err } - execution := &commonpb.WorkflowExecution{WorkflowId: request.ScheduleId} + workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId + execution := &commonpb.WorkflowExecution{WorkflowId: workflowID} // first describe to get memo and search attributes describeResponse, err := wh.historyClient.DescribeWorkflowExecution(ctx, &historyservice.DescribeWorkflowExecutionRequest{ @@ -3319,6 +3322,8 @@ func (wh *WorkflowHandler) UpdateSchedule(ctx context.Context, request *workflow return nil, errRequestIDTooLong } + workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId + namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace())) if err != nil { return nil, err @@ -3342,7 +3347,7 @@ func (wh *WorkflowHandler) UpdateSchedule(ctx context.Context, request *workflow sizeLimitWarn, sizeLimitError, namespaceID.String(), - request.GetScheduleId(), + workflowID, "", // don't have runid yet wh.metricsScope(ctx).Tagged(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())), wh.throttledLogger, @@ -3355,7 +3360,7 @@ func (wh *WorkflowHandler) UpdateSchedule(ctx context.Context, request *workflow NamespaceId: namespaceID.String(), SignalRequest: &workflowservice.SignalWorkflowExecutionRequest{ Namespace: request.Namespace, - WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: request.ScheduleId}, + WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: workflowID}, SignalName: scheduler.SignalNameUpdate, Input: inputPayloads, Identity: request.Identity, @@ -3393,6 +3398,8 @@ func (wh *WorkflowHandler) PatchSchedule(ctx context.Context, request *workflows return nil, errRequestIDTooLong } + workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId + namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace())) if err != nil { return nil, err @@ -3415,7 +3422,7 @@ func (wh *WorkflowHandler) PatchSchedule(ctx context.Context, request *workflows sizeLimitWarn, sizeLimitError, namespaceID.String(), - request.GetScheduleId(), + workflowID, "", // don't have runid yet wh.metricsScope(ctx).Tagged(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())), wh.throttledLogger, @@ -3428,7 +3435,7 @@ func (wh *WorkflowHandler) PatchSchedule(ctx context.Context, request *workflows NamespaceId: namespaceID.String(), SignalRequest: &workflowservice.SignalWorkflowExecutionRequest{ Namespace: request.Namespace, - WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: request.ScheduleId}, + WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: workflowID}, SignalName: scheduler.SignalNamePatch, Input: inputPayloads, Identity: request.Identity, @@ -3462,6 +3469,8 @@ func (wh *WorkflowHandler) ListScheduleMatchingTimes(ctx context.Context, reques return nil, errSchedulesNotAllowed } + workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId + namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace())) if err != nil { return nil, err @@ -3479,7 +3488,7 @@ func (wh *WorkflowHandler) ListScheduleMatchingTimes(ctx context.Context, reques sizeLimitWarn, sizeLimitError, namespaceID.String(), - request.ScheduleId, + workflowID, "", wh.metricsScope(ctx).Tagged(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())), wh.throttledLogger, @@ -3491,7 +3500,7 @@ func (wh *WorkflowHandler) ListScheduleMatchingTimes(ctx context.Context, reques NamespaceId: namespaceID.String(), Request: &workflowservice.QueryWorkflowRequest{ Namespace: request.Namespace, - Execution: &commonpb.WorkflowExecution{WorkflowId: request.ScheduleId}, + Execution: &commonpb.WorkflowExecution{WorkflowId: workflowID}, Query: &querypb.WorkflowQuery{ QueryType: scheduler.QueryNameListMatchingTimes, QueryArgs: queryPayload, @@ -3532,6 +3541,8 @@ func (wh *WorkflowHandler) DeleteSchedule(ctx context.Context, request *workflow return nil, errSchedulesNotAllowed } + workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId + namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace())) if err != nil { return nil, err @@ -3541,7 +3552,7 @@ func (wh *WorkflowHandler) DeleteSchedule(ctx context.Context, request *workflow NamespaceId: namespaceID.String(), TerminateRequest: &workflowservice.TerminateWorkflowExecutionRequest{ Namespace: request.Namespace, - WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: request.ScheduleId}, + WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: workflowID}, Reason: "terminated by DeleteSchedule", Identity: request.Identity, }, @@ -3612,8 +3623,10 @@ func (wh *WorkflowHandler) ListSchedules(ctx context.Context, request *workflows info := wh.decodeScheduleListInfo(searchAttributes) searchAttributes = wh.cleanScheduleSearchAttributes(searchAttributes) memo := wh.cleanScheduleMemo(ex.GetMemo()) + workflowID := ex.GetExecution().GetWorkflowId() + scheduleID := strings.TrimPrefix(workflowID, scheduler.WorkflowIDPrefix) schedules[i] = &schedpb.ScheduleListEntry{ - ScheduleId: ex.GetExecution().GetWorkflowId(), + ScheduleId: scheduleID, Memo: memo, SearchAttributes: searchAttributes, Info: info, diff --git a/service/worker/scheduler/workflow.go b/service/worker/scheduler/workflow.go index 0c852658de2..fa0698327e6 100644 --- a/service/worker/scheduler/workflow.go +++ b/service/worker/scheduler/workflow.go @@ -52,6 +52,9 @@ import ( ) const ( + // Schedules are implemented by a workflow whose ID is this string plus the schedule ID. + WorkflowIDPrefix = "temporal-sys-scheduler:" + // This is an example of a timestamp that's appended to the workflow // id, used for validation in the frontend. AppendedTimestampForValidation = "-2009-11-10T23:00:00Z"