Skip to content

Commit

Permalink
Use a prefix for scheduler workflow ids (#3201)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr authored Aug 10, 2022
1 parent 3c6abda commit c8c2777
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 12 deletions.
37 changes: 25 additions & 12 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"context"
"encoding/binary"
"fmt"
"strings"
"sync/atomic"
"time"
"unicode/utf8"
Expand Down Expand Up @@ -3016,8 +3017,9 @@ func (wh *WorkflowHandler) CreateSchedule(ctx context.Context, request *workflow
return nil, errSchedulesNotAllowed
}

// a schedule id is a workflow id so validate it the same way
if err := wh.validateWorkflowID(request.ScheduleId); err != nil {
workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId

if err := wh.validateWorkflowID(workflowID); err != nil {
return nil, err
}

Expand Down Expand Up @@ -3109,7 +3111,7 @@ func (wh *WorkflowHandler) CreateSchedule(ctx context.Context, request *workflow
// Create StartWorkflowExecutionRequest
startReq := &workflowservice.StartWorkflowExecutionRequest{
Namespace: request.Namespace,
WorkflowId: request.ScheduleId,
WorkflowId: workflowID,
WorkflowType: &commonpb.WorkflowType{Name: scheduler.WorkflowType},
TaskQueue: &taskqueuepb.TaskQueue{Name: workercommon.PerNSWorkerTaskQueue},
Input: inputPayload,
Expand Down Expand Up @@ -3156,7 +3158,8 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl
return nil, err
}

execution := &commonpb.WorkflowExecution{WorkflowId: request.ScheduleId}
workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId
execution := &commonpb.WorkflowExecution{WorkflowId: workflowID}

// first describe to get memo and search attributes
describeResponse, err := wh.historyClient.DescribeWorkflowExecution(ctx, &historyservice.DescribeWorkflowExecutionRequest{
Expand Down Expand Up @@ -3321,6 +3324,8 @@ func (wh *WorkflowHandler) UpdateSchedule(ctx context.Context, request *workflow
return nil, errRequestIDTooLong
}

workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId

namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace()))
if err != nil {
return nil, err
Expand All @@ -3344,7 +3349,7 @@ func (wh *WorkflowHandler) UpdateSchedule(ctx context.Context, request *workflow
sizeLimitWarn,
sizeLimitError,
namespaceID.String(),
request.GetScheduleId(),
workflowID,
"", // don't have runid yet
wh.metricsScope(ctx).Tagged(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
wh.throttledLogger,
Expand All @@ -3357,7 +3362,7 @@ func (wh *WorkflowHandler) UpdateSchedule(ctx context.Context, request *workflow
NamespaceId: namespaceID.String(),
SignalRequest: &workflowservice.SignalWorkflowExecutionRequest{
Namespace: request.Namespace,
WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: request.ScheduleId},
WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: workflowID},
SignalName: scheduler.SignalNameUpdate,
Input: inputPayloads,
Identity: request.Identity,
Expand Down Expand Up @@ -3395,6 +3400,8 @@ func (wh *WorkflowHandler) PatchSchedule(ctx context.Context, request *workflows
return nil, errRequestIDTooLong
}

workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId

namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace()))
if err != nil {
return nil, err
Expand All @@ -3417,7 +3424,7 @@ func (wh *WorkflowHandler) PatchSchedule(ctx context.Context, request *workflows
sizeLimitWarn,
sizeLimitError,
namespaceID.String(),
request.GetScheduleId(),
workflowID,
"", // don't have runid yet
wh.metricsScope(ctx).Tagged(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
wh.throttledLogger,
Expand All @@ -3430,7 +3437,7 @@ func (wh *WorkflowHandler) PatchSchedule(ctx context.Context, request *workflows
NamespaceId: namespaceID.String(),
SignalRequest: &workflowservice.SignalWorkflowExecutionRequest{
Namespace: request.Namespace,
WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: request.ScheduleId},
WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: workflowID},
SignalName: scheduler.SignalNamePatch,
Input: inputPayloads,
Identity: request.Identity,
Expand Down Expand Up @@ -3464,6 +3471,8 @@ func (wh *WorkflowHandler) ListScheduleMatchingTimes(ctx context.Context, reques
return nil, errSchedulesNotAllowed
}

workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId

namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace()))
if err != nil {
return nil, err
Expand All @@ -3481,7 +3490,7 @@ func (wh *WorkflowHandler) ListScheduleMatchingTimes(ctx context.Context, reques
sizeLimitWarn,
sizeLimitError,
namespaceID.String(),
request.ScheduleId,
workflowID,
"",
wh.metricsScope(ctx).Tagged(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
wh.throttledLogger,
Expand All @@ -3493,7 +3502,7 @@ func (wh *WorkflowHandler) ListScheduleMatchingTimes(ctx context.Context, reques
NamespaceId: namespaceID.String(),
Request: &workflowservice.QueryWorkflowRequest{
Namespace: request.Namespace,
Execution: &commonpb.WorkflowExecution{WorkflowId: request.ScheduleId},
Execution: &commonpb.WorkflowExecution{WorkflowId: workflowID},
Query: &querypb.WorkflowQuery{
QueryType: scheduler.QueryNameListMatchingTimes,
QueryArgs: queryPayload,
Expand Down Expand Up @@ -3534,6 +3543,8 @@ func (wh *WorkflowHandler) DeleteSchedule(ctx context.Context, request *workflow
return nil, errSchedulesNotAllowed
}

workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId

namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace()))
if err != nil {
return nil, err
Expand All @@ -3543,7 +3554,7 @@ func (wh *WorkflowHandler) DeleteSchedule(ctx context.Context, request *workflow
NamespaceId: namespaceID.String(),
TerminateRequest: &workflowservice.TerminateWorkflowExecutionRequest{
Namespace: request.Namespace,
WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: request.ScheduleId},
WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: workflowID},
Reason: "terminated by DeleteSchedule",
Identity: request.Identity,
},
Expand Down Expand Up @@ -3615,8 +3626,10 @@ func (wh *WorkflowHandler) ListSchedules(ctx context.Context, request *workflows
info := wh.decodeScheduleListInfo(searchAttributes)
searchAttributes = wh.cleanScheduleSearchAttributes(searchAttributes)
memo := wh.cleanScheduleMemo(ex.GetMemo())
workflowID := ex.GetExecution().GetWorkflowId()
scheduleID := strings.TrimPrefix(workflowID, scheduler.WorkflowIDPrefix)
schedules[i] = &schedpb.ScheduleListEntry{
ScheduleId: ex.GetExecution().GetWorkflowId(),
ScheduleId: scheduleID,
Memo: memo,
SearchAttributes: searchAttributes,
Info: info,
Expand Down
3 changes: 3 additions & 0 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ import (
)

const (
// Schedules are implemented by a workflow whose ID is this string plus the schedule ID.
WorkflowIDPrefix = "temporal-sys-scheduler:"

// This is an example of a timestamp that's appended to the workflow
// id, used for validation in the frontend.
AppendedTimestampForValidation = "-2009-11-10T23:00:00Z"
Expand Down

0 comments on commit c8c2777

Please sign in to comment.