From 9e11aeb6457a550b0e0d30ffa68c2fdb5c03d61d Mon Sep 17 00:00:00 2001 From: Sebastian Date: Mon, 30 Dec 2024 17:42:37 +0100 Subject: [PATCH 1/3] fix: race condition with parallel + enable_sharing --- CHANGELOG.md | 6 ++ cmd/terramate/cli/run.go | 36 +++++---- e2etests/core/run_sharing_test.go | 83 +++++++++++++++++++++ run/oncemap.go | 47 ++++++++++++ run/oncemap_test.go | 120 ++++++++++++++++++++++++++++++ 5 files changed, 276 insertions(+), 16 deletions(-) create mode 100644 run/oncemap.go create mode 100644 run/oncemap_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 752e3559d..79f08a6d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,12 @@ Given a version number `MAJOR.MINOR.PATCH`, we increment the: - Backward compatibility in versions `0.0.z` is **not guaranteed** when `z` is increased. - Backward compatibility in versions `0.y.z` is **not guaranteed** when `y` is increased. +## Unreleased + +### Fixed + +- Fix race condition when using `parallel` in combination with `outputs-sharing`. + ## v0.11.5 ### Added diff --git a/cmd/terramate/cli/run.go b/cmd/terramate/cli/run.go index 2af8400f4..700a3661c 100644 --- a/cmd/terramate/cli/run.go +++ b/cmd/terramate/cli/run.go @@ -26,6 +26,7 @@ import ( "github.com/terramate-io/terramate/hcl/ast" "github.com/terramate-io/terramate/printer" prj "github.com/terramate-io/terramate/project" + "github.com/terramate-io/terramate/run" runutil "github.com/terramate-io/terramate/run" "github.com/terramate-io/terramate/run/dag" "github.com/terramate-io/terramate/scheduler" @@ -380,9 +381,7 @@ func (c *cli) runAll( }() // map of stackName -> map of backendName -> outputs - type stackOutputs map[prj.Path]map[string]cty.Value - - allOutputs := stackOutputs{} + allOutputs := run.NewOnceMap[string, *run.OnceMap[string, cty.Value]]() err = sched.Run(func(run stackRun) error { errs := errors.L() @@ -476,12 +475,12 @@ func (c *cli) runAll( } break tasksLoop } - _, ok = allOutputs[otherStack.Dir] - if !ok { - allOutputs[otherStack.Dir] = make(map[string]cty.Value) - } - _, ok = allOutputs[otherStack.Dir][backend.Name] - if !ok { + + stackOutputs, _ := allOutputs.GetOrInit(otherStack.Dir.String(), func() (*runutil.OnceMap[string, cty.Value], error) { + return runutil.NewOnceMap[string, cty.Value](), nil + }) + + outputsVal, err := stackOutputs.GetOrInit(backend.Name, func() (cty.Value, error) { var stdout bytes.Buffer var stderr bytes.Buffer cmd := exec.Command(backend.Command[0], backend.Command[1:]...) @@ -492,14 +491,15 @@ func (c *cli) runAll( err := cmd.Run() if err != nil { if !task.MockOnFail { - errs.Append(errors.E(err, "failed to execute: (cmd: %s) (stdout: %s) (stderr: %s)", cmd.String(), stdout.String(), stderr.String())) + err := errors.E(err, "failed to execute: (cmd: %s) (stdout: %s) (stderr: %s)", cmd.String(), stdout.String(), stderr.String()) + errs.Append(err) c.cloudSyncAfter(cloudRun, runResult{ExitCode: -1}, errors.E(ErrRunCommandNotExecuted, err)) releaseResource() failedTaskIndex = taskIndex if !continueOnError { cancel() } - break tasksLoop + return cty.Value{}, err } printer.Stderr.WarnWithDetails( @@ -518,7 +518,8 @@ func (c *cli) runAll( if !continueOnError { cancel() } - break tasksLoop + return cty.Value{}, err + } inputVal, err = json.Unmarshal(stdoutBytes, typ) if err != nil { @@ -530,13 +531,16 @@ func (c *cli) runAll( if !continueOnError { cancel() } - break tasksLoop + return cty.Value{}, err } } - allOutputs[otherStack.Dir][backend.Name] = inputVal + return inputVal, nil + }) + if err != nil { + break tasksLoop } - stackOutputs := allOutputs[otherStack.Dir][backend.Name] - evalctx.SetNamespaceRaw("outputs", stackOutputs) + + evalctx.SetNamespaceRaw("outputs", outputsVal) inputVal, inputErr := input.Value(evalctx) mockVal, mockFound, mockErr := input.Mock(evalctx) diff --git a/e2etests/core/run_sharing_test.go b/e2etests/core/run_sharing_test.go index b6893c4c6..5b554f017 100644 --- a/e2etests/core/run_sharing_test.go +++ b/e2etests/core/run_sharing_test.go @@ -4,6 +4,7 @@ package core_test import ( + "fmt" "path/filepath" "regexp" "testing" @@ -582,3 +583,85 @@ func TestRunSharing(t *testing.T) { } } } + +func TestRunSharingParallel(t *testing.T) { + // This test triggers the race condition described in SC-14248. + t.Parallel() + + layout := []string{ + "f:backend.tm:" + Block("sharing_backend", + Labels("name"), + Expr("type", "terraform"), + Str("filename", "sharing.tf"), + Command("terraform", "output", "-json"), + ).String(), + "s:s1:id=s1", + "f:s1/main.tf:" + Doc( + Block("resource", + Labels("local_file", "s1_file"), + Str("content", "s1_content"), + Str("filename", "${path.module}/file.txt"), + ), + ).String(), + "f:s1/output.tm:" + Output( + Labels("s1_output"), + Str("backend", "name"), + Expr("value", "resource.local_file.s1_file.content"), + ).String(), + } + + for i := 2; i < 5; i++ { + layout = append(layout, + fmt.Sprintf(`s:s%d:after=["/s1"]`, i), + fmt.Sprintf("f:s%d/input.tm:", i)+Input( + Labels(fmt.Sprintf("s%d_input", i)), + Str("backend", "name"), + Expr("value", "outputs.s1_output.value"), + Str("from_stack_id", "s1"), + ).String(), + fmt.Sprintf("f:s%d/main.tf:", i)+Doc( + Block("resource", + Labels("local_file", fmt.Sprintf("s%d_file", i)), + Expr("content", fmt.Sprintf("var.s%d_input", i)), + Str("filename", "${path.module}/file.txt"), + ), + ).String(), + ) + } + + layout = append(layout, + "f:exp.tm:"+Terramate( + Config( + Experiments(hcl.SharingIsCaringExperimentName), + ), + ).String(), + ) + + s := sandbox.New(t) + s.BuildTree(layout) + + tmcli := NewCLI(t, s.RootDir()) + tmcli.PrependToPath(filepath.Dir(TerraformTestPath)) + res := tmcli.Run("run", HelperPath, "echo", "hello") + + if res.Status == 0 { + // generate safeguard must trigger + t.Fatal("run must fail if sharing is not generated") + } + + AssertRunResult(t, tmcli.Run("generate"), RunExpected{ + IgnoreStdout: true, + }) + AssertRunResult(t, tmcli.Run("run", "--quiet", "-X", "terraform", "init"), + RunExpected{ + IgnoreStdout: true, + }, + ) + s.Git().CommitAll("all") + + AssertRunResult(t, tmcli.Run("run", "--quiet", "--enable-sharing", "--mock-on-fail", "--parallel=10", "terraform", "apply", "-auto-approve"), + RunExpected{ + IgnoreStdout: true, + }, + ) +} diff --git a/run/oncemap.go b/run/oncemap.go new file mode 100644 index 000000000..e57c6f955 --- /dev/null +++ b/run/oncemap.go @@ -0,0 +1,47 @@ +// Copyright 2023 Terramate GmbH +// SPDX-License-Identifier: MPL-2.0 + +package run + +import "sync" + +// OnceMap contains map of key/value pairs that is safe for concurrent initialization. +type OnceMap[K ~string, V any] struct { + mtx sync.RWMutex + data map[K]V +} + +// NewOnceMap returns a new empty OnceMap. +func NewOnceMap[K ~string, V any]() *OnceMap[K, V] { + return &OnceMap[K, V]{data: make(map[K]V)} +} + +// GetOrInit obtains a value for the given key if the value exists in the map, or +// initializes it with the given init function. +// This function is safe to be called concurrently. +func (m *OnceMap[K, V]) GetOrInit(k K, init func() (V, error)) (V, error) { + // Read-lock and check if value already exists. + m.mtx.RLock() + v, found := m.data[k] + m.mtx.RUnlock() + + if found { + return v, nil + } + + // If not, write-lock, check again, and maybe initialize it. + m.mtx.Lock() + defer m.mtx.Unlock() + + v, found = m.data[k] + if found { + return v, nil + } + + v, err := init() + if err != nil { + return v, err + } + m.data[k] = v + return v, nil +} diff --git a/run/oncemap_test.go b/run/oncemap_test.go new file mode 100644 index 000000000..bd9a9cc01 --- /dev/null +++ b/run/oncemap_test.go @@ -0,0 +1,120 @@ +// Copyright 2023 Terramate GmbH +// SPDX-License-Identifier: MPL-2.0 + +package run_test + +import ( + "errors" + "fmt" + "strconv" + "sync" + "sync/atomic" + "testing" + + "github.com/madlambda/spells/assert" + "github.com/terramate-io/terramate/run" +) + +func TestOnceMap(t *testing.T) { + t.Parallel() + + t.Run("ok", func(t *testing.T) { + t.Parallel() + + outerCount := 0 + innerCount := 0 + + data := run.NewOnceMap[string, *run.OnceMap[string, string]]() + + for outer := 0; outer < 10; outer++ { + k1 := strconv.Itoa(outer) + v1, err1 := data.GetOrInit(k1, func() (*run.OnceMap[string, string], error) { + outerCount++ + return run.NewOnceMap[string, string](), nil + }) + assert.NoError(t, err1) + + for inner := 0; inner < 10; inner++ { + k2 := strconv.Itoa(inner) + v2, err2 := v1.GetOrInit(k2, func() (string, error) { + innerCount++ + return fmt.Sprintf("%d_%d", outer, inner), nil + }) + assert.NoError(t, err2) + assert.EqualStrings(t, fmt.Sprintf("%d_%d", outer, inner), v2) + } + } + + assert.EqualInts(t, 10, outerCount, "outer count") + assert.EqualInts(t, 100, innerCount, "inner count") + }) + + t.Run("error", func(t *testing.T) { + t.Parallel() + + count := 0 + + m := run.NewOnceMap[string, string]() + + _, err := m.GetOrInit("k", func() (string, error) { + count++ + return "", errors.New("failed") + }) + + assert.EqualErrs(t, err, errors.New("failed")) + + v, err := m.GetOrInit("k", func() (string, error) { + count++ + return "success", nil + }) + + assert.EqualStrings(t, "success", v) + assert.NoError(t, err) + }) + + t.Run("concurrent", func(t *testing.T) { + t.Parallel() + + var outerCount atomic.Int32 + var innerCount atomic.Int32 + + data := run.NewOnceMap[string, *run.OnceMap[string, string]]() + + var wg sync.WaitGroup + + for outer := 0; outer < 10; outer++ { + outer := outer + + wg.Add(1 + 10) + + go func() { + defer wg.Done() + + k1 := strconv.Itoa(outer) + v1, _ := data.GetOrInit(k1, func() (*run.OnceMap[string, string], error) { + outerCount.Add(1) + return run.NewOnceMap[string, string](), nil + }) + + for inner := 0; inner < 10; inner++ { + inner := inner + + go func() { + defer wg.Done() + + k2 := strconv.Itoa(inner) + _, _ = v1.GetOrInit(k2, func() (string, error) { + innerCount.Add(1) + return "blah", nil + }) + }() + } + }() + } + + wg.Wait() + + assert.EqualInts(t, 10, int(outerCount.Load()), "outer count") + assert.EqualInts(t, 100, int(innerCount.Load()), "inner inner") + }) +} From d5c08ea9b10d7f620f55335a9d37a38e361acdfb Mon Sep 17 00:00:00 2001 From: Sebastian Date: Thu, 2 Jan 2025 11:04:31 +0100 Subject: [PATCH 2/3] chore: bump dev version --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index 62d5dbdf3..47317ee5c 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.11.5 +0.11.6-dev From ae19148edfc3a4eb63bb01f877299e8070f0e610 Mon Sep 17 00:00:00 2001 From: Sebastian Date: Thu, 2 Jan 2025 11:17:19 +0100 Subject: [PATCH 3/3] chore: upgrade to opentofu/setup-opentofu v1.0.5 --- .github/workflows/ci-experimental.yml | 2 +- .github/workflows/ci-sync-deployment.yml | 2 +- .github/workflows/ci-sync-preview.yml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci-experimental.yml b/.github/workflows/ci-experimental.yml index 2f9683e04..fd9db72a3 100644 --- a/.github/workflows/ci-experimental.yml +++ b/.github/workflows/ci-experimental.yml @@ -26,7 +26,7 @@ jobs: with: go-version: ${{ matrix.go }} - - uses: opentofu/setup-opentofu@12f4debbf681675350b6cd1f0ff8ecfbda62027b # pin@v1 + - uses: opentofu/setup-opentofu@592200bd4b9bbf4772ace78f887668b1aee8f716 # pin@v1.0.5 with: tofu_version: 1.6.2 tofu_wrapper: false diff --git a/.github/workflows/ci-sync-deployment.yml b/.github/workflows/ci-sync-deployment.yml index 18c122b8e..dc54f69a2 100644 --- a/.github/workflows/ci-sync-deployment.yml +++ b/.github/workflows/ci-sync-deployment.yml @@ -40,7 +40,7 @@ jobs: with: go-version: ${{ matrix.go }} - - uses: opentofu/setup-opentofu@12f4debbf681675350b6cd1f0ff8ecfbda62027b # pin@v1 + - uses: opentofu/setup-opentofu@592200bd4b9bbf4772ace78f887668b1aee8f716 # pin@v1.0.5 with: tofu_version: 1.6.2 tofu_wrapper: false diff --git a/.github/workflows/ci-sync-preview.yml b/.github/workflows/ci-sync-preview.yml index c5d18a097..740348123 100644 --- a/.github/workflows/ci-sync-preview.yml +++ b/.github/workflows/ci-sync-preview.yml @@ -41,7 +41,7 @@ jobs: - name: check all packages with tests are Terramate Stacks run: ./hack/check-stacks.sh - - uses: opentofu/setup-opentofu@12f4debbf681675350b6cd1f0ff8ecfbda62027b # pin@v1 + - uses: opentofu/setup-opentofu@592200bd4b9bbf4772ace78f887668b1aee8f716 # pin@v1.0.5 with: tofu_version: 1.6.2 tofu_wrapper: false