Skip to content

Commit

Permalink
[fix] Hex encode workflow name before passing to engine
Browse files Browse the repository at this point in the history
  • Loading branch information
cedric-cordenier committed Jan 17, 2025
1 parent 1a260c0 commit 0ac27f8
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 41 deletions.
40 changes: 20 additions & 20 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1191,23 +1191,23 @@ func (e *Engine) Name() string {
}

type Config struct {
Workflow sdk.WorkflowSpec
WorkflowID string
WorkflowOwner string
WorkflowName string // Full human-readable workflow name. Intended for metrics and logging.
WorkflowNameTransform string // The Workflow Name in an on-chain format, which has requirements of being hex encoded and max 10 bytes
Lggr logger.Logger
Registry core.CapabilitiesRegistry
MaxWorkerLimit int
QueueSize int
NewWorkerTimeout time.Duration
MaxExecutionDuration time.Duration
Store store.Store
Config []byte
Binary []byte
SecretsFetcher secretsFetcher
HeartbeatCadence time.Duration
StepTimeout time.Duration
Workflow sdk.WorkflowSpec
WorkflowID string
WorkflowOwner string
WorkflowName string // Full human-readable workflow name. Intended for metrics and logging.
WorkflowHexName string // The Workflow Name in an on-chain format, which has requirements of being 10 bytes long, hex encoded (i.e. 20 hex characters long). If not provided, we'll derive it from the workflowName.
Lggr logger.Logger
Registry core.CapabilitiesRegistry
MaxWorkerLimit int
QueueSize int
NewWorkerTimeout time.Duration
MaxExecutionDuration time.Duration
Store store.Store
Config []byte
Binary []byte
SecretsFetcher secretsFetcher
HeartbeatCadence time.Duration
StepTimeout time.Duration

// For testing purposes only
maxRetries int
Expand Down Expand Up @@ -1298,11 +1298,11 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {

workflow.id = cfg.WorkflowID
workflow.owner = cfg.WorkflowOwner
workflow.hexName = hex.EncodeToString([]byte(cfg.WorkflowName))
workflow.name = cfg.WorkflowName

if len(cfg.WorkflowNameTransform) > 0 {
workflow.hexName = cfg.WorkflowNameTransform
workflow.hexName = hex.EncodeToString([]byte(cfg.WorkflowName))
if cfg.WorkflowHexName != "" {
workflow.hexName = cfg.WorkflowHexName
}

engine = &Engine{
Expand Down
20 changes: 12 additions & 8 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func newLastFetchedAtMap() *lastFetchedAtMap {
}
}

type engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error)
type engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error)

// eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding
// method that handles the event.
Expand Down Expand Up @@ -467,12 +467,16 @@ func (h *eventHandler) workflowRegisteredEvent(
return nil
}

truncatedName := pkgworkflows.HashTruncateName(payload.WorkflowName)
hexName := hex.EncodeToString([]byte(truncatedName))

// If status == active, start a new WorkflowEngine instance, and add it to local engine registry
engine, err := h.engineFactory(
ctx,
wfID,
owner,
payload.WorkflowName,
hexName,
config,
decodedBinary,
)
Expand Down Expand Up @@ -526,7 +530,7 @@ func (h *eventHandler) getWorkflowArtifacts(
return decodedBinary, config, nil
}

func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner string, name string, config []byte, binary []byte) (services.Service, error) {
func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) {
moduleConfig := &host.ModuleConfig{Logger: h.lggr, Labeler: h.emitter}
sdkSpec, err := host.GetWorkflowSpec(ctx, moduleConfig, binary, config)
if err != nil {
Expand All @@ -542,12 +546,12 @@ func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner str
// Internal workflow names must not exceed 10 bytes for workflow engine and on-chain use.
// A name is used internally that is first hashed to avoid collisions,
// hex encoded to ensure UTF8 encoding, then truncated to 10 bytes.
WorkflowNameTransform: pkgworkflows.HashTruncateName(name),
Registry: h.capRegistry,
Store: h.workflowStore,
Config: config,
Binary: binary,
SecretsFetcher: h,
WorkflowHexName: hexName,
Registry: h.capRegistry,
Store: h.workflowStore,
Config: config,
Binary: binary,
SecretsFetcher: h,
}
return workflows.NewEngine(ctx, cfg)
}
Expand Down
64 changes: 51 additions & 13 deletions core/services/workflows/syncer/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -204,16 +205,17 @@ func Test_workflowRegisteredHandler(t *testing.T) {
var wfOwner = []byte("0xOwner")
var binary = wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t)
var encodedBinary = []byte(base64.StdEncoding.EncodeToString(binary))
var workflowName = "workflow-name"

defaultValidationFn := func(t *testing.T, ctx context.Context, event WorkflowRegistryWorkflowRegisteredV1, h *eventHandler, wfOwner []byte, wfName string, wfID string) {
err := h.workflowRegisteredEvent(ctx, event)
require.NoError(t, err)

// Verify the record is updated in the database
dbSpec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name")
dbSpec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), workflowName)
require.NoError(t, err)
require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner)
require.Equal(t, "workflow-name", dbSpec.WorkflowName)
require.Equal(t, workflowName, dbSpec.WorkflowName)
require.Equal(t, job.WorkflowSpecStatusActive, dbSpec.Status)

// Verify the engine is started
Expand All @@ -231,7 +233,43 @@ func Test_workflowRegisteredHandler(t *testing.T) {
configURL: {Body: config, Err: nil},
secretsURL: {Body: []byte("secrets"), Err: nil},
}),
engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error) {
engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) {
return &mockEngine{}, nil
},
GiveConfig: config,
ConfigURL: configURL,
SecretsURL: secretsURL,
BinaryURL: binaryURL,
GiveBinary: binary,
WFOwner: wfOwner,
Event: func(wfID []byte) WorkflowRegistryWorkflowRegisteredV1 {
return WorkflowRegistryWorkflowRegisteredV1{
Status: uint8(0),
WorkflowID: [32]byte(wfID),
WorkflowOwner: wfOwner,
WorkflowName: workflowName,
BinaryURL: binaryURL,
ConfigURL: configURL,
SecretsURL: secretsURL,
}
},
validationFn: defaultValidationFn,
},
{
Name: "correctly generates the workflow name",
fetcher: newMockFetcher(map[string]mockFetchResp{
binaryURL: {Body: encodedBinary, Err: nil},
configURL: {Body: config, Err: nil},
secretsURL: {Body: []byte("secrets"), Err: nil},
}),
engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) {
if _, err := hex.DecodeString(hexName); err != nil {
return nil, fmt.Errorf("invalid workflow name: %w", err)
}
want := hex.EncodeToString([]byte(pkgworkflows.HashTruncateName(workflowName)))
if want != hexName {
return nil, fmt.Errorf("invalid workflow name: doesn't match, got %s, want %s", hexName, want)
}
return &mockEngine{}, nil
},
GiveConfig: config,
Expand All @@ -245,7 +283,7 @@ func Test_workflowRegisteredHandler(t *testing.T) {
Status: uint8(0),
WorkflowID: [32]byte(wfID),
WorkflowOwner: wfOwner,
WorkflowName: "workflow-name",
WorkflowName: workflowName,
BinaryURL: binaryURL,
ConfigURL: configURL,
SecretsURL: secretsURL,
Expand All @@ -260,7 +298,7 @@ func Test_workflowRegisteredHandler(t *testing.T) {
configURL: {Body: config, Err: nil},
secretsURL: {Body: []byte("secrets"), Err: nil},
}),
engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error) {
engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) {
return &mockEngine{StartErr: assert.AnError}, nil
},
GiveConfig: config,
Expand All @@ -274,7 +312,7 @@ func Test_workflowRegisteredHandler(t *testing.T) {
Status: uint8(0),
WorkflowID: [32]byte(wfID),
WorkflowOwner: wfOwner,
WorkflowName: "workflow-name",
WorkflowName: workflowName,
BinaryURL: binaryURL,
ConfigURL: configURL,
SecretsURL: secretsURL,
Expand Down Expand Up @@ -304,7 +342,7 @@ func Test_workflowRegisteredHandler(t *testing.T) {
Status: uint8(0),
WorkflowID: [32]byte(wfID),
WorkflowOwner: wfOwner,
WorkflowName: "workflow-name",
WorkflowName: workflowName,
BinaryURL: binaryURL,
ConfigURL: configURL,
SecretsURL: secretsURL,
Expand Down Expand Up @@ -336,7 +374,7 @@ func Test_workflowRegisteredHandler(t *testing.T) {
Status: uint8(1),
WorkflowID: [32]byte(wfID),
WorkflowOwner: wfOwner,
WorkflowName: "workflow-name",
WorkflowName: workflowName,
BinaryURL: binaryURL,
ConfigURL: configURL,
SecretsURL: secretsURL,
Expand All @@ -347,10 +385,10 @@ func Test_workflowRegisteredHandler(t *testing.T) {
require.NoError(t, err)

// Verify the record is updated in the database
dbSpec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name")
dbSpec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), workflowName)
require.NoError(t, err)
require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner)
require.Equal(t, "workflow-name", dbSpec.WorkflowName)
require.Equal(t, workflowName, dbSpec.WorkflowName)
require.Equal(t, job.WorkflowSpecStatusPaused, dbSpec.Status)

// Verify there is no running engine
Expand All @@ -376,7 +414,7 @@ func Test_workflowRegisteredHandler(t *testing.T) {
Status: uint8(0),
WorkflowID: [32]byte(wfID),
WorkflowOwner: wfOwner,
WorkflowName: "workflow-name",
WorkflowName: workflowName,
BinaryURL: binaryURL,
SecretsURL: secretsURL,
}
Expand All @@ -399,7 +437,7 @@ func Test_workflowRegisteredHandler(t *testing.T) {
Status: uint8(0),
WorkflowID: [32]byte(wfID),
WorkflowOwner: wfOwner,
WorkflowName: "workflow-name",
WorkflowName: workflowName,
BinaryURL: binaryURL,
ConfigURL: configURL,
}
Expand All @@ -423,7 +461,7 @@ type testCase struct {
fetcher FetcherFunc
Event func([]byte) WorkflowRegistryWorkflowRegisteredV1
validationFn func(t *testing.T, ctx context.Context, event WorkflowRegistryWorkflowRegisteredV1, h *eventHandler, wfOwner []byte, wfName string, wfID string)
engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error)
engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error)
}

func testRunningWorkflow(t *testing.T, tc testCase) {
Expand Down

0 comments on commit 0ac27f8

Please sign in to comment.