diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index f6e73bb8bc5..5a712bcbfae 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -1191,23 +1191,23 @@ func (e *Engine) Name() string { } type Config struct { - Workflow sdk.WorkflowSpec - WorkflowID string - WorkflowOwner string - WorkflowName string // Full human-readable workflow name. Intended for metrics and logging. - WorkflowNameTransform string // The Workflow Name in an on-chain format, which has requirements of being hex encoded and max 10 bytes - Lggr logger.Logger - Registry core.CapabilitiesRegistry - MaxWorkerLimit int - QueueSize int - NewWorkerTimeout time.Duration - MaxExecutionDuration time.Duration - Store store.Store - Config []byte - Binary []byte - SecretsFetcher secretsFetcher - HeartbeatCadence time.Duration - StepTimeout time.Duration + Workflow sdk.WorkflowSpec + WorkflowID string + WorkflowOwner string + WorkflowName string // Full human-readable workflow name. Intended for metrics and logging. + WorkflowHexName string // The Workflow Name in an on-chain format, which has requirements of being 10 bytes long, hex encoded (i.e. 20 hex characters long). If not provided, we'll derive it from the workflowName. + Lggr logger.Logger + Registry core.CapabilitiesRegistry + MaxWorkerLimit int + QueueSize int + NewWorkerTimeout time.Duration + MaxExecutionDuration time.Duration + Store store.Store + Config []byte + Binary []byte + SecretsFetcher secretsFetcher + HeartbeatCadence time.Duration + StepTimeout time.Duration // For testing purposes only maxRetries int @@ -1298,11 +1298,11 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) { workflow.id = cfg.WorkflowID workflow.owner = cfg.WorkflowOwner - workflow.hexName = hex.EncodeToString([]byte(cfg.WorkflowName)) workflow.name = cfg.WorkflowName - if len(cfg.WorkflowNameTransform) > 0 { - workflow.hexName = cfg.WorkflowNameTransform + workflow.hexName = hex.EncodeToString([]byte(cfg.WorkflowName)) + if cfg.WorkflowHexName != "" { + workflow.hexName = cfg.WorkflowHexName } engine = &Engine{ diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index c8dbf94846d..3955ac95bf6 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -128,7 +128,7 @@ func newLastFetchedAtMap() *lastFetchedAtMap { } } -type engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error) +type engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) // eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding // method that handles the event. @@ -467,12 +467,16 @@ func (h *eventHandler) workflowRegisteredEvent( return nil } + truncatedName := pkgworkflows.HashTruncateName(payload.WorkflowName) + hexName := hex.EncodeToString([]byte(truncatedName)) + // If status == active, start a new WorkflowEngine instance, and add it to local engine registry engine, err := h.engineFactory( ctx, wfID, owner, payload.WorkflowName, + hexName, config, decodedBinary, ) @@ -526,7 +530,7 @@ func (h *eventHandler) getWorkflowArtifacts( return decodedBinary, config, nil } -func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner string, name string, config []byte, binary []byte) (services.Service, error) { +func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) { moduleConfig := &host.ModuleConfig{Logger: h.lggr, Labeler: h.emitter} sdkSpec, err := host.GetWorkflowSpec(ctx, moduleConfig, binary, config) if err != nil { @@ -542,12 +546,12 @@ func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner str // Internal workflow names must not exceed 10 bytes for workflow engine and on-chain use. // A name is used internally that is first hashed to avoid collisions, // hex encoded to ensure UTF8 encoding, then truncated to 10 bytes. - WorkflowNameTransform: pkgworkflows.HashTruncateName(name), - Registry: h.capRegistry, - Store: h.workflowStore, - Config: config, - Binary: binary, - SecretsFetcher: h, + WorkflowHexName: hexName, + Registry: h.capRegistry, + Store: h.workflowStore, + Config: config, + Binary: binary, + SecretsFetcher: h, } return workflows.NewEngine(ctx, cfg) } diff --git a/core/services/workflows/syncer/handler_test.go b/core/services/workflows/syncer/handler_test.go index 3b3231a3d3d..e7713db2d0e 100644 --- a/core/services/workflows/syncer/handler_test.go +++ b/core/services/workflows/syncer/handler_test.go @@ -7,6 +7,7 @@ import ( "encoding/hex" "encoding/json" "errors" + "fmt" "testing" "time" @@ -204,16 +205,17 @@ func Test_workflowRegisteredHandler(t *testing.T) { var wfOwner = []byte("0xOwner") var binary = wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t) var encodedBinary = []byte(base64.StdEncoding.EncodeToString(binary)) + var workflowName = "workflow-name" defaultValidationFn := func(t *testing.T, ctx context.Context, event WorkflowRegistryWorkflowRegisteredV1, h *eventHandler, wfOwner []byte, wfName string, wfID string) { err := h.workflowRegisteredEvent(ctx, event) require.NoError(t, err) // Verify the record is updated in the database - dbSpec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name") + dbSpec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), workflowName) require.NoError(t, err) require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner) - require.Equal(t, "workflow-name", dbSpec.WorkflowName) + require.Equal(t, workflowName, dbSpec.WorkflowName) require.Equal(t, job.WorkflowSpecStatusActive, dbSpec.Status) // Verify the engine is started @@ -231,7 +233,43 @@ func Test_workflowRegisteredHandler(t *testing.T) { configURL: {Body: config, Err: nil}, secretsURL: {Body: []byte("secrets"), Err: nil}, }), - engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error) { + engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) { + return &mockEngine{}, nil + }, + GiveConfig: config, + ConfigURL: configURL, + SecretsURL: secretsURL, + BinaryURL: binaryURL, + GiveBinary: binary, + WFOwner: wfOwner, + Event: func(wfID []byte) WorkflowRegistryWorkflowRegisteredV1 { + return WorkflowRegistryWorkflowRegisteredV1{ + Status: uint8(0), + WorkflowID: [32]byte(wfID), + WorkflowOwner: wfOwner, + WorkflowName: workflowName, + BinaryURL: binaryURL, + ConfigURL: configURL, + SecretsURL: secretsURL, + } + }, + validationFn: defaultValidationFn, + }, + { + Name: "correctly generates the workflow name", + fetcher: newMockFetcher(map[string]mockFetchResp{ + binaryURL: {Body: encodedBinary, Err: nil}, + configURL: {Body: config, Err: nil}, + secretsURL: {Body: []byte("secrets"), Err: nil}, + }), + engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) { + if _, err := hex.DecodeString(hexName); err != nil { + return nil, fmt.Errorf("invalid workflow name: %w", err) + } + want := hex.EncodeToString([]byte(pkgworkflows.HashTruncateName(workflowName))) + if want != hexName { + return nil, fmt.Errorf("invalid workflow name: doesn't match, got %s, want %s", hexName, want) + } return &mockEngine{}, nil }, GiveConfig: config, @@ -245,7 +283,7 @@ func Test_workflowRegisteredHandler(t *testing.T) { Status: uint8(0), WorkflowID: [32]byte(wfID), WorkflowOwner: wfOwner, - WorkflowName: "workflow-name", + WorkflowName: workflowName, BinaryURL: binaryURL, ConfigURL: configURL, SecretsURL: secretsURL, @@ -260,7 +298,7 @@ func Test_workflowRegisteredHandler(t *testing.T) { configURL: {Body: config, Err: nil}, secretsURL: {Body: []byte("secrets"), Err: nil}, }), - engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error) { + engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) { return &mockEngine{StartErr: assert.AnError}, nil }, GiveConfig: config, @@ -274,7 +312,7 @@ func Test_workflowRegisteredHandler(t *testing.T) { Status: uint8(0), WorkflowID: [32]byte(wfID), WorkflowOwner: wfOwner, - WorkflowName: "workflow-name", + WorkflowName: workflowName, BinaryURL: binaryURL, ConfigURL: configURL, SecretsURL: secretsURL, @@ -304,7 +342,7 @@ func Test_workflowRegisteredHandler(t *testing.T) { Status: uint8(0), WorkflowID: [32]byte(wfID), WorkflowOwner: wfOwner, - WorkflowName: "workflow-name", + WorkflowName: workflowName, BinaryURL: binaryURL, ConfigURL: configURL, SecretsURL: secretsURL, @@ -336,7 +374,7 @@ func Test_workflowRegisteredHandler(t *testing.T) { Status: uint8(1), WorkflowID: [32]byte(wfID), WorkflowOwner: wfOwner, - WorkflowName: "workflow-name", + WorkflowName: workflowName, BinaryURL: binaryURL, ConfigURL: configURL, SecretsURL: secretsURL, @@ -347,10 +385,10 @@ func Test_workflowRegisteredHandler(t *testing.T) { require.NoError(t, err) // Verify the record is updated in the database - dbSpec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name") + dbSpec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), workflowName) require.NoError(t, err) require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner) - require.Equal(t, "workflow-name", dbSpec.WorkflowName) + require.Equal(t, workflowName, dbSpec.WorkflowName) require.Equal(t, job.WorkflowSpecStatusPaused, dbSpec.Status) // Verify there is no running engine @@ -376,7 +414,7 @@ func Test_workflowRegisteredHandler(t *testing.T) { Status: uint8(0), WorkflowID: [32]byte(wfID), WorkflowOwner: wfOwner, - WorkflowName: "workflow-name", + WorkflowName: workflowName, BinaryURL: binaryURL, SecretsURL: secretsURL, } @@ -399,7 +437,7 @@ func Test_workflowRegisteredHandler(t *testing.T) { Status: uint8(0), WorkflowID: [32]byte(wfID), WorkflowOwner: wfOwner, - WorkflowName: "workflow-name", + WorkflowName: workflowName, BinaryURL: binaryURL, ConfigURL: configURL, } @@ -423,7 +461,7 @@ type testCase struct { fetcher FetcherFunc Event func([]byte) WorkflowRegistryWorkflowRegisteredV1 validationFn func(t *testing.T, ctx context.Context, event WorkflowRegistryWorkflowRegisteredV1, h *eventHandler, wfOwner []byte, wfName string, wfID string) - engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error) + engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, hexName string, config []byte, binary []byte) (services.Service, error) } func testRunningWorkflow(t *testing.T, tc testCase) {