Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] Hex encode workflow name before passing to engine #15971

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func (m *mockService) Name() string { return "svc" }

type mockEngineFactory struct{}

func (m *mockEngineFactory) new(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error) {
func (m *mockEngineFactory) new(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) {
return &mockService{}, nil
}

Expand Down
40 changes: 20 additions & 20 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1191,23 +1191,23 @@ func (e *Engine) Name() string {
}

type Config struct {
Workflow sdk.WorkflowSpec
WorkflowID string
WorkflowOwner string
WorkflowName string // Full human-readable workflow name. Intended for metrics and logging.
WorkflowNameTransform string // The Workflow Name in an on-chain format, which has requirements of being hex encoded and max 10 bytes
Lggr logger.Logger
Registry core.CapabilitiesRegistry
MaxWorkerLimit int
QueueSize int
NewWorkerTimeout time.Duration
MaxExecutionDuration time.Duration
Store store.Store
Config []byte
Binary []byte
SecretsFetcher secretsFetcher
HeartbeatCadence time.Duration
StepTimeout time.Duration
Workflow sdk.WorkflowSpec
WorkflowID string
WorkflowOwner string
WorkflowName string // Full human-readable workflow name. Intended for metrics and logging.
WorkflowHexName string // The Workflow Name in an on-chain format, which has requirements of being 10 bytes long, hex encoded (i.e. 20 hex characters long). If not provided, we'll derive it from the workflowName.
Lggr logger.Logger
Registry core.CapabilitiesRegistry
MaxWorkerLimit int
QueueSize int
NewWorkerTimeout time.Duration
MaxExecutionDuration time.Duration
Store store.Store
Config []byte
Binary []byte
SecretsFetcher secretsFetcher
HeartbeatCadence time.Duration
StepTimeout time.Duration

// For testing purposes only
maxRetries int
Expand Down Expand Up @@ -1298,11 +1298,11 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {

workflow.id = cfg.WorkflowID
workflow.owner = cfg.WorkflowOwner
workflow.hexName = hex.EncodeToString([]byte(cfg.WorkflowName))
workflow.name = cfg.WorkflowName

if len(cfg.WorkflowNameTransform) > 0 {
workflow.hexName = cfg.WorkflowNameTransform
workflow.hexName = hex.EncodeToString([]byte(cfg.WorkflowName))
if cfg.WorkflowHexName != "" {
workflow.hexName = cfg.WorkflowHexName
}

engine = &Engine{
Expand Down
20 changes: 12 additions & 8 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func newLastFetchedAtMap() *lastFetchedAtMap {
}
}

type engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error)
type engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error)

// eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding
// method that handles the event.
Expand Down Expand Up @@ -467,12 +467,16 @@ func (h *eventHandler) workflowRegisteredEvent(
return nil
}

truncatedName := pkgworkflows.HashTruncateName(payload.WorkflowName)
hexName := hex.EncodeToString([]byte(truncatedName))

// If status == active, start a new WorkflowEngine instance, and add it to local engine registry
engine, err := h.engineFactory(
ctx,
wfID,
owner,
payload.WorkflowName,
hexName,
config,
decodedBinary,
)
Expand Down Expand Up @@ -526,7 +530,7 @@ func (h *eventHandler) getWorkflowArtifacts(
return decodedBinary, config, nil
}

func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner string, name string, config []byte, binary []byte) (services.Service, error) {
func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) {
moduleConfig := &host.ModuleConfig{Logger: h.lggr, Labeler: h.emitter}
sdkSpec, err := host.GetWorkflowSpec(ctx, moduleConfig, binary, config)
if err != nil {
Expand All @@ -542,12 +546,12 @@ func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner str
// Internal workflow names must not exceed 10 bytes for workflow engine and on-chain use.
// A name is used internally that is first hashed to avoid collisions,
// hex encoded to ensure UTF8 encoding, then truncated to 10 bytes.
WorkflowNameTransform: pkgworkflows.HashTruncateName(name),
Registry: h.capRegistry,
Store: h.workflowStore,
Config: config,
Binary: binary,
SecretsFetcher: h,
WorkflowHexName: hexName,
Registry: h.capRegistry,
Store: h.workflowStore,
Config: config,
Binary: binary,
SecretsFetcher: h,
}
return workflows.NewEngine(ctx, cfg)
}
Expand Down
64 changes: 51 additions & 13 deletions core/services/workflows/syncer/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -204,16 +205,17 @@ func Test_workflowRegisteredHandler(t *testing.T) {
var wfOwner = []byte("0xOwner")
var binary = wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t)
var encodedBinary = []byte(base64.StdEncoding.EncodeToString(binary))
var workflowName = "workflow-name"

defaultValidationFn := func(t *testing.T, ctx context.Context, event WorkflowRegistryWorkflowRegisteredV1, h *eventHandler, wfOwner []byte, wfName string, wfID string) {
err := h.workflowRegisteredEvent(ctx, event)
require.NoError(t, err)

// Verify the record is updated in the database
dbSpec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name")
dbSpec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), workflowName)
require.NoError(t, err)
require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner)
require.Equal(t, "workflow-name", dbSpec.WorkflowName)
require.Equal(t, workflowName, dbSpec.WorkflowName)
require.Equal(t, job.WorkflowSpecStatusActive, dbSpec.Status)

// Verify the engine is started
Expand All @@ -231,7 +233,43 @@ func Test_workflowRegisteredHandler(t *testing.T) {
configURL: {Body: config, Err: nil},
secretsURL: {Body: []byte("secrets"), Err: nil},
}),
engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error) {
engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) {
return &mockEngine{}, nil
},
GiveConfig: config,
ConfigURL: configURL,
SecretsURL: secretsURL,
BinaryURL: binaryURL,
GiveBinary: binary,
WFOwner: wfOwner,
Event: func(wfID []byte) WorkflowRegistryWorkflowRegisteredV1 {
return WorkflowRegistryWorkflowRegisteredV1{
Status: uint8(0),
WorkflowID: [32]byte(wfID),
WorkflowOwner: wfOwner,
WorkflowName: workflowName,
BinaryURL: binaryURL,
ConfigURL: configURL,
SecretsURL: secretsURL,
}
},
validationFn: defaultValidationFn,
},
{
Name: "correctly generates the workflow name",
fetcher: newMockFetcher(map[string]mockFetchResp{
binaryURL: {Body: encodedBinary, Err: nil},
configURL: {Body: config, Err: nil},
secretsURL: {Body: []byte("secrets"), Err: nil},
}),
engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) {
if _, err := hex.DecodeString(hexName); err != nil {
return nil, fmt.Errorf("invalid workflow name: %w", err)
}
want := hex.EncodeToString([]byte(pkgworkflows.HashTruncateName(workflowName)))
if want != hexName {
return nil, fmt.Errorf("invalid workflow name: doesn't match, got %s, want %s", hexName, want)
}
return &mockEngine{}, nil
},
GiveConfig: config,
Expand All @@ -245,7 +283,7 @@ func Test_workflowRegisteredHandler(t *testing.T) {
Status: uint8(0),
WorkflowID: [32]byte(wfID),
WorkflowOwner: wfOwner,
WorkflowName: "workflow-name",
WorkflowName: workflowName,
BinaryURL: binaryURL,
ConfigURL: configURL,
SecretsURL: secretsURL,
Expand All @@ -260,7 +298,7 @@ func Test_workflowRegisteredHandler(t *testing.T) {
configURL: {Body: config, Err: nil},
secretsURL: {Body: []byte("secrets"), Err: nil},
}),
engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error) {
engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) {
return &mockEngine{StartErr: assert.AnError}, nil
},
GiveConfig: config,
Expand All @@ -274,7 +312,7 @@ func Test_workflowRegisteredHandler(t *testing.T) {
Status: uint8(0),
WorkflowID: [32]byte(wfID),
WorkflowOwner: wfOwner,
WorkflowName: "workflow-name",
WorkflowName: workflowName,
BinaryURL: binaryURL,
ConfigURL: configURL,
SecretsURL: secretsURL,
Expand Down Expand Up @@ -304,7 +342,7 @@ func Test_workflowRegisteredHandler(t *testing.T) {
Status: uint8(0),
WorkflowID: [32]byte(wfID),
WorkflowOwner: wfOwner,
WorkflowName: "workflow-name",
WorkflowName: workflowName,
BinaryURL: binaryURL,
ConfigURL: configURL,
SecretsURL: secretsURL,
Expand Down Expand Up @@ -336,7 +374,7 @@ func Test_workflowRegisteredHandler(t *testing.T) {
Status: uint8(1),
WorkflowID: [32]byte(wfID),
WorkflowOwner: wfOwner,
WorkflowName: "workflow-name",
WorkflowName: workflowName,
BinaryURL: binaryURL,
ConfigURL: configURL,
SecretsURL: secretsURL,
Expand All @@ -347,10 +385,10 @@ func Test_workflowRegisteredHandler(t *testing.T) {
require.NoError(t, err)

// Verify the record is updated in the database
dbSpec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name")
dbSpec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), workflowName)
require.NoError(t, err)
require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner)
require.Equal(t, "workflow-name", dbSpec.WorkflowName)
require.Equal(t, workflowName, dbSpec.WorkflowName)
require.Equal(t, job.WorkflowSpecStatusPaused, dbSpec.Status)

// Verify there is no running engine
Expand All @@ -376,7 +414,7 @@ func Test_workflowRegisteredHandler(t *testing.T) {
Status: uint8(0),
WorkflowID: [32]byte(wfID),
WorkflowOwner: wfOwner,
WorkflowName: "workflow-name",
WorkflowName: workflowName,
BinaryURL: binaryURL,
SecretsURL: secretsURL,
}
Expand All @@ -399,7 +437,7 @@ func Test_workflowRegisteredHandler(t *testing.T) {
Status: uint8(0),
WorkflowID: [32]byte(wfID),
WorkflowOwner: wfOwner,
WorkflowName: "workflow-name",
WorkflowName: workflowName,
BinaryURL: binaryURL,
ConfigURL: configURL,
}
Expand All @@ -423,7 +461,7 @@ type testCase struct {
fetcher FetcherFunc
Event func([]byte) WorkflowRegistryWorkflowRegisteredV1
validationFn func(t *testing.T, ctx context.Context, event WorkflowRegistryWorkflowRegisteredV1, h *eventHandler, wfOwner []byte, wfName string, wfID string)
engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error)
engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error)
}

func testRunningWorkflow(t *testing.T, tc testCase) {
Expand Down
Loading