From db942731b1e3a6d235e5039d87590742b57d5956 Mon Sep 17 00:00:00 2001 From: Evan Cordell Date: Thu, 31 Aug 2023 21:41:36 -0400 Subject: [PATCH] implement a locking version of the dual write workflow this also removes the failpoint library in favor of a quick local version that doesn't require transforming the codebase. --- e2e/e2e_test.go | 5 +- e2e/go.mod | 8 +- e2e/go.sum | 7 - e2e/proxy_test.go | 285 ++++++++++++++++++++----------- e2e/util_test.go | 36 ++++ go.mod | 4 +- go.sum | 7 - magefiles/go.mod | 2 - magefiles/go.sum | 9 - magefiles/test.go | 39 +---- magefiles/tools.go | 1 - pkg/failpoints/failpoints_off.go | 7 + pkg/failpoints/failpoints_on.go | 29 ++++ pkg/proxy/authz.go | 18 +- pkg/proxy/durable.go | 182 -------------------- pkg/proxy/durable_activities.go | 62 +++++++ pkg/proxy/durable_workflows.go | 267 +++++++++++++++++++++++++++++ pkg/proxy/server.go | 44 +++-- pkg/spicedb/bootstrap.yaml | 4 + 19 files changed, 638 insertions(+), 378 deletions(-) create mode 100644 e2e/util_test.go create mode 100644 pkg/failpoints/failpoints_off.go create mode 100644 pkg/failpoints/failpoints_on.go delete mode 100644 pkg/proxy/durable.go create mode 100644 pkg/proxy/durable_activities.go create mode 100644 pkg/proxy/durable_workflows.go diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index 0a6a638..c3109f2 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -40,7 +40,8 @@ import ( ) var ( - testEnv *envtest.Environment + testEnv *envtest.Environment + proxySrv *proxy.Server // adminUser is configured for the un-proxied apiserver adminUser *envtest.AuthenticatedUser @@ -107,7 +108,7 @@ var _ = SynchronizedBeforeSuite(func() []byte { opts.SecureServing.BindAddress = net.ParseIP("127.0.0.1") opts.Authentication.BuiltInOptions.ClientCert.ClientCA = clientCA.Path() Expect(opts.Complete(ctx)).To(Succeed()) - proxySrv, err := proxy.NewServer(ctx, *opts) + proxySrv, err = proxy.NewServer(ctx, *opts) Expect(err).To(Succeed()) ctx, cancel := context.WithCancel(context.Background()) diff --git a/e2e/go.mod b/e2e/go.mod index 5b47c49..3bb3e69 100644 --- a/e2e/go.mod +++ b/e2e/go.mod @@ -5,6 +5,8 @@ go 1.20 replace github.com/authzed/spicedb-kubeapi-proxy => ../ require ( + github.com/authzed/authzed-go v0.9.1-0.20230810180432-2fb0fd4c66dd + github.com/authzed/spicedb v1.24.1-0.20230821163419-e4bb3adfd50b github.com/authzed/spicedb-kubeapi-proxy v0.0.0-00010101000000-000000000000 github.com/go-logr/zapr v1.2.4 github.com/onsi/ginkgo/v2 v2.11.0 @@ -14,6 +16,7 @@ require ( go.uber.org/zap v1.24.0 k8s.io/api v0.28.0 k8s.io/apimachinery v0.28.0 + k8s.io/apiserver v0.28.0 k8s.io/client-go v0.28.0 sigs.k8s.io/controller-runtime v0.15.1 sigs.k8s.io/controller-runtime/tools/setup-envtest v0.0.0-20230817155522-304027bcbe4b @@ -33,11 +36,9 @@ require ( github.com/NYTimes/gziphandler v1.1.1 // indirect github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df // indirect github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a // indirect - github.com/authzed/authzed-go v0.9.1-0.20230810180432-2fb0fd4c66dd // indirect github.com/authzed/cel-go v0.17.5 // indirect github.com/authzed/consistent v0.1.0 // indirect github.com/authzed/grpcutil v0.0.0-20230703173955-bdd0ac3f16a5 // indirect - github.com/authzed/spicedb v1.24.1-0.20230821163419-e4bb3adfd50b // indirect github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect @@ -139,8 +140,6 @@ require ( github.com/outcaste-io/ristretto v0.2.3 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pelletier/go-toml/v2 v2.0.9 // indirect - github.com/pingcap/errors v0.11.4 // indirect - github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pquerna/cachecontrol v0.1.0 // indirect @@ -208,7 +207,6 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.27.2 // indirect - k8s.io/apiserver v0.28.0 // indirect k8s.io/cloud-provider v0.0.0 // indirect k8s.io/cluster-bootstrap v0.0.0 // indirect k8s.io/component-base v0.28.0 // indirect diff --git a/e2e/go.sum b/e2e/go.sum index cf4a7db..aee69f0 100644 --- a/e2e/go.sum +++ b/e2e/go.sum @@ -452,10 +452,6 @@ github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2D github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= github.com/pelletier/go-toml/v2 v2.0.9 h1:uH2qQXheeefCCkuBBSLi7jCiSmj3VRh2+Goq2N7Xxu0= github.com/pelletier/go-toml/v2 v2.0.9/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= -github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= -github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= -github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgWM9fSBIvaxsJHuGP0uM74HXtv3MyyGQ= -github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -503,7 +499,6 @@ github.com/scylladb/go-set v1.0.2 h1:SkvlMCKhP0wyyct6j+0IHJkBkSZL+TDzZ4E7f7BCcRE github.com/scylladb/go-set v1.0.2/go.mod h1:DkpGd78rljTxKAnTDPFqXSGxvETQnJyuSOQwsHycqfs= github.com/sean-/sysexits v1.0.0 h1:FLf1xcUTBzTqUI1Nc77UwYPcoWgDM09lyMTt8+QCpbE= github.com/sean-/sysexits v1.0.0/go.mod h1:yRz1mwglmPHOlAm3+WGr40EV8qFg4hn8GE9MoNwoecg= -github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= @@ -980,7 +975,6 @@ google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= @@ -995,7 +989,6 @@ gopkg.in/square/go-jose.v2 v2.6.0/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76 gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/e2e/proxy_test.go b/e2e/proxy_test.go index 86b7ea9..3c7f73b 100644 --- a/e2e/proxy_test.go +++ b/e2e/proxy_test.go @@ -4,11 +4,11 @@ package e2e import ( "context" - "fmt" + "sync" + v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "github.com/pingcap/failpoint" "github.com/samber/lo" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -16,10 +16,13 @@ import ( "k8s.io/apiserver/pkg/storage/names" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" + + "github.com/authzed/spicedb-kubeapi-proxy/pkg/failpoints" + "github.com/authzed/spicedb-kubeapi-proxy/pkg/proxy" ) var _ = Describe("Proxy", func() { - Describe("with two users", func() { + When("there are two users", func() { var paulClient, chaniClient, adminClient kubernetes.Interface var paulNamespace, chaniNamespace string @@ -46,125 +49,211 @@ var _ = Describe("Proxy", func() { }) AfterEach(func(ctx context.Context) { - _ = adminClient.CoreV1().Namespaces().Delete(ctx, paulNamespace, metav1.DeleteOptions{}) - _ = adminClient.CoreV1().Namespaces().Delete(ctx, chaniNamespace, metav1.DeleteOptions{}) - }) + orphan := metav1.DeletePropagationOrphan + _ = adminClient.CoreV1().Namespaces().Delete(ctx, paulNamespace, metav1.DeleteOptions{PropagationPolicy: &orphan}) + _ = adminClient.CoreV1().Namespaces().Delete(ctx, chaniNamespace, metav1.DeleteOptions{PropagationPolicy: &orphan}) - It("doesn't show users namespaces the other has created", func(ctx context.Context) { - // not created yet, neither can access - _, err := paulClient.CoreV1().Namespaces().Get(ctx, paulNamespace, metav1.GetOptions{}) - Expect(k8serrors.IsNotFound(err)).To(BeTrue()) - _, err = chaniClient.CoreV1().Namespaces().Get(ctx, chaniNamespace, metav1.GetOptions{}) - Expect(k8serrors.IsNotFound(err)).To(BeTrue()) + // ensure there are no remaining locks + Expect(len(GetAllTuples(ctx, &v1.RelationshipFilter{ + ResourceType: "lock", + OptionalRelation: "workflow", + OptionalSubjectFilter: &v1.SubjectFilter{SubjectType: "workflow"}, + }))).To(BeZero()) + }) - // each creates their respective namespace - _, err = paulClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{Name: paulNamespace}, + CreateNamespace := func(ctx context.Context, client kubernetes.Interface, namespace string) error { + _, err := client.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: namespace}, }, metav1.CreateOptions{}) + return err + } + GetNamespace := func(ctx context.Context, client kubernetes.Interface, namespace string) error { + _, err := client.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{}) + return err + } + ListNamespaces := func(ctx context.Context, client kubernetes.Interface) []string { + visibleNamespaces, err := client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{}) Expect(err).To(Succeed()) + return lo.Map(visibleNamespaces.Items, func(item corev1.Namespace, index int) string { + return item.Name + }) + } - _, err = chaniClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{Name: chaniNamespace}, - }, metav1.CreateOptions{}) - Expect(err).To(Succeed()) + JustBeforeEach(func(ctx context.Context) { + // before every test, assert no access + Expect(k8serrors.IsNotFound(GetNamespace(ctx, paulClient, paulNamespace))).To(BeTrue()) + Expect(k8serrors.IsNotFound(GetNamespace(ctx, paulClient, chaniNamespace))).To(BeTrue()) + Expect(k8serrors.IsNotFound(GetNamespace(ctx, chaniClient, paulNamespace))).To(BeTrue()) + Expect(k8serrors.IsNotFound(GetNamespace(ctx, chaniClient, chaniNamespace))).To(BeTrue()) + }) - // each can get their respective namespace - _, err = paulClient.CoreV1().Namespaces().Get(ctx, paulNamespace, metav1.GetOptions{}) - Expect(err).To(Succeed()) - _, err = chaniClient.CoreV1().Namespaces().Get(ctx, chaniNamespace, metav1.GetOptions{}) - Expect(err).To(Succeed()) + AssertDualWriteBehavior := func() { + It("doesn't show users namespaces the other has created", func(ctx context.Context) { + // each creates their respective namespace + Expect(CreateNamespace(ctx, paulClient, paulNamespace)).To(Succeed()) + Expect(CreateNamespace(ctx, chaniClient, chaniNamespace)).To(Succeed()) - // neither can get each other's namespace - out, err := paulClient.CoreV1().Namespaces().Get(ctx, chaniNamespace, metav1.GetOptions{}) - fmt.Println(out) - Expect(k8serrors.IsNotFound(err)).To(BeTrue()) - _, err = chaniClient.CoreV1().Namespaces().Get(ctx, paulNamespace, metav1.GetOptions{}) - Expect(k8serrors.IsNotFound(err)).To(BeTrue()) + // each can get their respective namespace + Expect(GetNamespace(ctx, paulClient, paulNamespace)).To(Succeed()) + Expect(GetNamespace(ctx, chaniClient, chaniNamespace)).To(Succeed()) - // neither can see each other's namespace in the list - paulVisibleNamespaces, err := paulClient.CoreV1().Namespaces().List(ctx, metav1.ListOptions{}) - Expect(err).To(Succeed()) - paulVisibleNamespaceNames := lo.Map(paulVisibleNamespaces.Items, func(item corev1.Namespace, index int) string { - return item.Name + // neither can get each other's namespace + Expect(k8serrors.IsNotFound(GetNamespace(ctx, paulClient, chaniNamespace))).To(BeTrue()) + Expect(k8serrors.IsNotFound(GetNamespace(ctx, chaniClient, paulNamespace))).To(BeTrue()) + + // neither can see each other's namespace in the list + paulList := ListNamespaces(ctx, paulClient) + chaniList := ListNamespaces(ctx, chaniClient) + Expect(paulList).ToNot(ContainElement(chaniNamespace)) + Expect(paulList).To(ContainElement(paulNamespace)) + Expect(chaniList).ToNot(ContainElement(paulNamespace)) + Expect(chaniList).To(ContainElement(chaniNamespace)) }) - Expect(paulVisibleNamespaceNames).To(ContainElement(paulNamespace)) - chaniVisibleNamespaces, err := chaniClient.CoreV1().Namespaces().List(ctx, metav1.ListOptions{}) - Expect(err).To(Succeed()) - chaniVisibleNamespaceNames := lo.Map(chaniVisibleNamespaces.Items, func(item corev1.Namespace, index int) string { - return item.Name + It("cleans up dual writes on kube failures", func(ctx context.Context) { + // paul creates his namespace + Expect(CreateNamespace(ctx, paulClient, paulNamespace)).To(Succeed()) + + // make kube write fail for chani's namespace, spicedb write will have + // succeeded + if proxySrv.LockMode == proxy.LockingWriteToSpiceDBAndKube { + // the locking version retries if the connection fails + failpoints.EnableFailPoint("panicKubeWrite", proxy.MaxKubeAttempts+1) + } else { + failpoints.EnableFailPoint("panicKubeWrite", 1) + } + Expect(CreateNamespace(ctx, chaniClient, chaniNamespace)).ToNot(BeNil()) + + // paul creates chani's namespace + Expect(CreateNamespace(ctx, paulClient, chaniNamespace)).To(Succeed()) + + // paul can get get both namespaces + Expect(GetNamespace(ctx, paulClient, paulNamespace)).To(Succeed()) + Expect(GetNamespace(ctx, paulClient, chaniNamespace)).To(Succeed()) + + // chani can't get her namespace - this indicates the spicedb write was rolled back + // from the failed dual write above + Expect(k8serrors.IsNotFound(GetNamespace(ctx, chaniClient, chaniNamespace))).To(BeTrue()) }) - Expect(chaniVisibleNamespaceNames).To(ContainElement(chaniNamespace)) - }) - It("cleans up dual writes on errors", func(ctx context.Context) { - // not created yet, neither can access - _, err := paulClient.CoreV1().Namespaces().Get(ctx, paulNamespace, metav1.GetOptions{}) - Expect(k8serrors.IsNotFound(err)).To(BeTrue()) - _, err = chaniClient.CoreV1().Namespaces().Get(ctx, chaniNamespace, metav1.GetOptions{}) - Expect(k8serrors.IsNotFound(err)).To(BeTrue()) + It("recovers dual writes when kube write succeeds but crashes", func(ctx context.Context) { + // paul creates his namespace + Expect(CreateNamespace(ctx, paulClient, paulNamespace)).To(Succeed()) - // paul creates his namespace - _, err = paulClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{Name: paulNamespace}, - }, metav1.CreateOptions{}) - Expect(err).To(Succeed()) + // make kube write succeed, but crash process before it can be recorded + failpoints.EnableFailPoint("panicKubeReadResp", 1) + Expect(CreateNamespace(ctx, chaniClient, chaniNamespace)).ToNot(BeNil()) - // make kube write fail - err = failpoint.Enable("github.com/authzed/spicedb-kubeapi-proxy/pkg/proxy/panicKubeWrite", "panic") - Expect(err).To(Succeed()) + // Chani can get her namespace - the workflow has resolve the write + // Pessimistic locking retried the kube request and got an "already exists" err + // Optimistic locking checked kube and saw that the object already existed + Expect(GetNamespace(ctx, chaniClient, chaniNamespace)).To(Succeed()) + }) - _, err = chaniClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{Name: chaniNamespace}, - }, metav1.CreateOptions{}) - Expect(err).ToNot(BeNil()) + It("recovers dual writes when spicedb write failures", func(ctx context.Context) { + // paul creates his namespace + Expect(CreateNamespace(ctx, paulClient, paulNamespace)).To(Succeed()) - // paul creates chani's namespace - Expect(failpoint.Disable("github.com/authzed/spicedb-kubeapi-proxy/pkg/proxy/panicKubeWrite")).To(Succeed()) - _, err = paulClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{Name: chaniNamespace}, - }, metav1.CreateOptions{}) - Expect(err).To(Succeed()) + // make spicedb write crash on chani's namespace write + failpoints.EnableFailPoint("panicWriteSpiceDB", 1) + Expect(CreateNamespace(ctx, chaniClient, chaniNamespace)).ToNot(BeNil()) - // paul can get get both namespaces - _, err = paulClient.CoreV1().Namespaces().Get(ctx, paulNamespace, metav1.GetOptions{}) - Expect(err).To(Succeed()) - _, err = paulClient.CoreV1().Namespaces().Get(ctx, chaniNamespace, metav1.GetOptions{}) - Expect(err).To(Succeed()) + // paul creates chani's namespace so that the namespace exists + Expect(CreateNamespace(ctx, paulClient, chaniNamespace)).To(Succeed()) - // chani can't get her namespace - this indicates the spicedb write was rolled back - // from the failed dual write above - _, err = chaniClient.CoreV1().Namespaces().Get(ctx, chaniNamespace, metav1.GetOptions{}) - Expect(k8serrors.IsNotFound(err)).To(BeTrue()) - }) + // check that chani can't get her namespace, indirectly showing + // that the spicedb write was rolled back + Expect(k8serrors.IsNotFound(GetNamespace(ctx, chaniClient, chaniNamespace))).To(BeTrue()) - It("recovers dual writes when it crashes", func(ctx context.Context) { - // not created yet, neither can access - _, err := paulClient.CoreV1().Namespaces().Get(ctx, paulNamespace, metav1.GetOptions{}) - Expect(k8serrors.IsNotFound(err)).To(BeTrue()) - _, err = chaniClient.CoreV1().Namespaces().Get(ctx, chaniNamespace, metav1.GetOptions{}) - Expect(k8serrors.IsNotFound(err)).To(BeTrue()) + // confirm the relationship doesn't exist + Expect(len(GetAllTuples(ctx, &v1.RelationshipFilter{ + ResourceType: "namespace", + OptionalResourceId: chaniNamespace, + OptionalRelation: "creator", + OptionalSubjectFilter: &v1.SubjectFilter{SubjectType: "user", OptionalSubjectId: "chani"}, + }))).To(BeZero()) + }) - // paul creates his namespace - _, err = paulClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{Name: paulNamespace}, - }, metav1.CreateOptions{}) - Expect(err).To(Succeed()) + It("recovers dual writes when spicedb write succeeds but crashes", func(ctx context.Context) { + // paul creates his namespace + Expect(CreateNamespace(ctx, paulClient, paulNamespace)).To(Succeed()) - // make kube write succeed, but crash process before it can be recorded - err = failpoint.Enable("github.com/authzed/spicedb-kubeapi-proxy/pkg/proxy/panicKubeReadResp", "panic") - Expect(err).To(Succeed()) + // make spicedb write crash on chani's namespace write + failpoints.EnableFailPoint("panicSpiceDBReadResp", 1) + err := CreateNamespace(ctx, chaniClient, chaniNamespace) + Expect(err).ToNot(BeNil()) + // pessimistic locking reports a conflict, optimistic locking reports already exists + Expect(k8serrors.IsConflict(err) || k8serrors.IsAlreadyExists(err)).To(BeTrue()) - _, err = chaniClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{Name: chaniNamespace}, - }, metav1.CreateOptions{}) - Expect(err).ToNot(BeNil()) + // paul creates chani's namespace so that the namespace exists + Expect(CreateNamespace(ctx, paulClient, chaniNamespace)).To(Succeed()) - Expect(failpoint.Disable("github.com/authzed/spicedb-kubeapi-proxy/pkg/proxy/panicKubeReadResp")).To(Succeed()) + // check that chani can't get her namespace, indirectly showing + // that the spicedb write was rolled back + Expect(k8serrors.IsNotFound(GetNamespace(ctx, chaniClient, chaniNamespace))).To(BeTrue()) - // Chani can get her namespace - _, err = chaniClient.CoreV1().Namespaces().Get(ctx, chaniNamespace, metav1.GetOptions{}) - Expect(err).To(Succeed()) + // confirm the relationship doesn't exist + Expect(len(GetAllTuples(ctx, &v1.RelationshipFilter{ + ResourceType: "namespace", + OptionalResourceId: chaniNamespace, + OptionalRelation: "creator", + OptionalSubjectFilter: &v1.SubjectFilter{SubjectType: "user", OptionalSubjectId: "chani"}, + }))).To(BeZero()) + }) + + It("ensures only one write at a time happens for a given object", func(ctx context.Context) { + // both attempt to create the namespace + start := make(chan struct{}) + errs := make(chan error, 2) + + var wg sync.WaitGroup + wg.Add(2) + + // in theory, these two requests could be run serially, but in + // practice they seem to always actually run in parallel as + // intended. + go func() { + defer GinkgoRecover() + <-start + errs <- CreateNamespace(ctx, paulClient, paulNamespace) + wg.Done() + }() + go func() { + defer GinkgoRecover() + <-start + errs <- CreateNamespace(ctx, chaniClient, paulNamespace) + wg.Done() + }() + start <- struct{}{} + start <- struct{}{} + wg.Wait() + close(errs) + + allErrs := make([]error, 0) + for err := range errs { + if err != nil { + allErrs = append(allErrs, err) + } + } + Expect(len(allErrs)).ToNot(BeZero()) + Expect(k8serrors.IsConflict(allErrs[0]) || // pessimistic lock + k8serrors.IsAlreadyExists(allErrs[0]), // optimistic lock + ).To(BeTrue()) + }) + } + + When("optimistic locking is used", func() { + BeforeEach(func() { + proxySrv.LockMode = proxy.OptimisticWriteToSpiceDBAndKube + }) + AssertDualWriteBehavior() + }) + + When("pessimistic locking is used", func() { + BeforeEach(func() { + proxySrv.LockMode = proxy.LockingWriteToSpiceDBAndKube + }) + AssertDualWriteBehavior() }) }) }) diff --git a/e2e/util_test.go b/e2e/util_test.go new file mode 100644 index 0000000..aab3faf --- /dev/null +++ b/e2e/util_test.go @@ -0,0 +1,36 @@ +//go:build e2e + +package e2e + +import ( + "context" + "io" + + v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" + "github.com/authzed/spicedb/pkg/tuple" + . "github.com/onsi/gomega" + "github.com/samber/lo" +) + +// GetAllTuples collects all tuples matching the filter from SpiceDB +func GetAllTuples(ctx context.Context, filter *v1.RelationshipFilter) []*v1.ReadRelationshipsResponse { + client, err := proxySrv.SpiceDBClient.ReadRelationships(ctx, &v1.ReadRelationshipsRequest{ + Consistency: &v1.Consistency{Requirement: &v1.Consistency_FullyConsistent{FullyConsistent: true}}, + RelationshipFilter: filter, + }) + Expect(err).To(Succeed()) + results := make([]*v1.ReadRelationshipsResponse, 0) + for resp, err := client.Recv(); err != io.EOF; resp, err = client.Recv() { + Expect(err).To(Succeed()) + results = append(results, resp) + } + return results +} + +// RelRespToStrings converts a slice of *v1.ReadRelationshipsResponse to a slice +// of tuple strings. +func RelRespToStrings(relResps []*v1.ReadRelationshipsResponse) []string { + return lo.Map(relResps, func(item *v1.ReadRelationshipsResponse, _ int) string { + return tuple.MustRelString(item.Relationship) + }) +} diff --git a/go.mod b/go.mod index e6eceb0..e66065f 100644 --- a/go.mod +++ b/go.mod @@ -5,9 +5,9 @@ go 1.20 require ( github.com/authzed/authzed-go v0.9.1-0.20230810180432-2fb0fd4c66dd github.com/authzed/spicedb v1.24.1-0.20230821163419-e4bb3adfd50b + github.com/cespare/xxhash/v2 v2.2.0 github.com/dustin/go-humanize v1.0.1 github.com/microsoft/durabletask-go v0.3.0 - github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.6-0.20210604193023-d5e0c0615ace google.golang.org/grpc v1.56.2 @@ -54,7 +54,6 @@ require ( github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cloudspannerecosystem/spanner-change-streams-tail v0.3.1 // indirect github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe // indirect github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 // indirect @@ -141,7 +140,6 @@ require ( github.com/outcaste-io/ristretto v0.2.3 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pelletier/go-toml/v2 v2.0.9 // indirect - github.com/pingcap/errors v0.11.4 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pquerna/cachecontrol v0.1.0 // indirect diff --git a/go.sum b/go.sum index 2d65d9a..50b632c 100644 --- a/go.sum +++ b/go.sum @@ -446,10 +446,6 @@ github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2D github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= github.com/pelletier/go-toml/v2 v2.0.9 h1:uH2qQXheeefCCkuBBSLi7jCiSmj3VRh2+Goq2N7Xxu0= github.com/pelletier/go-toml/v2 v2.0.9/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= -github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= -github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= -github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgWM9fSBIvaxsJHuGP0uM74HXtv3MyyGQ= -github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -497,7 +493,6 @@ github.com/scylladb/go-set v1.0.2 h1:SkvlMCKhP0wyyct6j+0IHJkBkSZL+TDzZ4E7f7BCcRE github.com/scylladb/go-set v1.0.2/go.mod h1:DkpGd78rljTxKAnTDPFqXSGxvETQnJyuSOQwsHycqfs= github.com/sean-/sysexits v1.0.0 h1:FLf1xcUTBzTqUI1Nc77UwYPcoWgDM09lyMTt8+QCpbE= github.com/sean-/sysexits v1.0.0/go.mod h1:yRz1mwglmPHOlAm3+WGr40EV8qFg4hn8GE9MoNwoecg= -github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= @@ -974,7 +969,6 @@ google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= @@ -989,7 +983,6 @@ gopkg.in/square/go-jose.v2 v2.6.0/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76 gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/magefiles/go.mod b/magefiles/go.mod index 567da6a..a3aff1d 100644 --- a/magefiles/go.mod +++ b/magefiles/go.mod @@ -8,7 +8,6 @@ require ( github.com/magefile/mage v1.15.0 github.com/onsi/ginkgo/v2 v2.11.0 github.com/onsi/gomega v1.27.10 - github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c golang.org/x/exp v0.0.0-20230725093048-515e97ebf090 k8s.io/apimachinery v0.28.0-beta.0 k8s.io/client-go v0.28.0-beta.0 @@ -107,7 +106,6 @@ require ( github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/sergi/go-diff v1.1.0 // indirect github.com/sigstore/cosign/v2 v2.0.3-0.20230523133326-0544abd8fc8a // indirect github.com/sigstore/rekor v1.2.0 // indirect github.com/sigstore/sigstore v1.6.4 // indirect diff --git a/magefiles/go.sum b/magefiles/go.sum index 93aac33..a87de73 100644 --- a/magefiles/go.sum +++ b/magefiles/go.sum @@ -448,9 +448,6 @@ github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3v github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ= github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4= -github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= -github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgWM9fSBIvaxsJHuGP0uM74HXtv3MyyGQ= -github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -469,8 +466,6 @@ github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= -github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= -github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/sigstore/cosign/v2 v2.0.3-0.20230523133326-0544abd8fc8a h1:4j4hrwYblDkNouA2fZ/hKvtJhV/N+jJGhLoRXUNLYmE= github.com/sigstore/cosign/v2 v2.0.3-0.20230523133326-0544abd8fc8a/go.mod h1:em8IHAamkOMXzXHjHx5NdLO1d8erWDMlGRlx0XE5TtI= github.com/sigstore/rekor v1.2.0 h1:ahlnoEY3zo8Vc+eZLPobamw6YfBTAbI0lthzUQd6qe4= @@ -549,8 +544,6 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.uber.org/automaxprocs v1.5.2 h1:2LxUOGiR3O6tw8ui5sZa2LAaHnsviZdVOUZw4fvbnME= go.uber.org/automaxprocs v1.5.2/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= -go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= -go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190422162423-af44ce270edf/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= @@ -771,7 +764,6 @@ golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgw golang.org/x/tools v0.0.0-20190816200558-6889da9d5479/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191113191852-77e3bb0ad9e7/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= @@ -914,7 +906,6 @@ google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw gopkg.in/alexcesaro/statsd.v2 v2.0.0 h1:FXkZSCZIH17vLCO5sO2UucTHsH9pc+17F6pl3JVCwMc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= diff --git a/magefiles/test.go b/magefiles/test.go index da2c3fe..c8fb2fb 100644 --- a/magefiles/test.go +++ b/magefiles/test.go @@ -4,8 +4,6 @@ package main import ( - "fmt" - "github.com/magefile/mage/mg" ) @@ -18,47 +16,14 @@ func (Test) Unit() error { // E2e runs the end-to-end tests against a real apiserver. func (t Test) E2e() error { - var f Failpoints - if err := f.Enable(); err != nil { - return err - } - testErr := RunSh("go", Tool())( + return RunSh("go", Tool())( "run", "github.com/onsi/ginkgo/v2/ginkgo", - "--tags=e2e", + "--tags=e2e,failpoints", "-r", "-vv", "--fail-fast", "--randomize-all", "../e2e", ) - if err := f.Disable(); err != nil { - fmt.Println(err) - } - return testErr -} - -type Failpoints mg.Namespace - -// Enable transforms the codebase to inject failpoints. -// Generally this is only called by the e2e task, but you can run it manually -// to look at generated code or to leave the codebase in a state ready for e2e -// (i.e. to use a debugger). -func (Failpoints) Enable() error { - return RunSh("go", Tool())( - "run", - "github.com/pingcap/failpoint/failpoint-ctl", - "enable", - "../", - ) -} - -// Disable transforms the codebase to remove failpoint injection. -func (Failpoints) Disable() error { - return RunSh("go", Tool())( - "run", - "github.com/pingcap/failpoint/failpoint-ctl", - "disable", - "../", - ) } diff --git a/magefiles/tools.go b/magefiles/tools.go index 4907b83..faf3c4f 100644 --- a/magefiles/tools.go +++ b/magefiles/tools.go @@ -6,5 +6,4 @@ package tools import ( _ "filippo.io/mkcert" _ "github.com/onsi/ginkgo/v2/ginkgo" - _ "github.com/pingcap/failpoint/failpoint-ctl" ) diff --git a/pkg/failpoints/failpoints_off.go b/pkg/failpoints/failpoints_off.go new file mode 100644 index 0000000..9232cff --- /dev/null +++ b/pkg/failpoints/failpoints_off.go @@ -0,0 +1,7 @@ +//go:build !failpoints +// +build !failpoints + +package failpoints + +// FailPoint does nothing in non-test builds +func FailPoint(_ string) {} diff --git a/pkg/failpoints/failpoints_on.go b/pkg/failpoints/failpoints_on.go new file mode 100644 index 0000000..2d88a1c --- /dev/null +++ b/pkg/failpoints/failpoints_on.go @@ -0,0 +1,29 @@ +//go:build failpoints +// +build failpoints + +package failpoints + +import "sync" + +var failpoints sync.Map + +// FailPoint will panic at its callsite if the name is in the list of enabled +// failpoints. +func FailPoint(failpoint string) { + if n, ok := failpoints.Load(failpoint); ok { + remaining := n.(int8) + if remaining > 0 { + remaining-- + failpoints.Store(failpoint, remaining) + panic(failpoint) + } else { + failpoints.Delete(n) + } + } +} + +// EnableFailPoint enables a failpoint for n calls. After n calls, the failpoint +// is disabled. +func EnableFailPoint(failpoint string, n int8) { + failpoints.Store(failpoint, n) +} diff --git a/pkg/proxy/authz.go b/pkg/proxy/authz.go index 733a9a2..6d3e775 100644 --- a/pkg/proxy/authz.go +++ b/pkg/proxy/authz.go @@ -26,7 +26,7 @@ import ( "k8s.io/apiserver/pkg/endpoints/request" ) -func withAuthorization(handler, failed http.Handler, spicedbClient v1.PermissionsServiceClient, watchClient v1.WatchServiceClient, taskHubClient backend.TaskHubClient) http.Handler { +func (s *Server) WithAuthorization(handler, failed http.Handler, watchClient v1.WatchServiceClient, taskHubClient backend.TaskHubClient) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -67,7 +67,7 @@ func withAuthorization(handler, failed http.Handler, spicedbClient v1.Permission return } - id, err := taskHubClient.ScheduleNewOrchestration(ctx, WriteToSpiceDBAndKubeName, api.WithInput(CreateObjInput{ + id, err := taskHubClient.ScheduleNewOrchestration(ctx, s.LockMode, api.WithInput(CreateObjInput{ RequestInfo: requestInfo, UserInfo: userInfo.(*user.DefaultInfo), ObjectMeta: &pom.ObjectMeta, @@ -84,13 +84,6 @@ func withAuthorization(handler, failed http.Handler, spicedbClient v1.Permission failed.ServeHTTP(w, req) return } - // TODO: needed? - if metadata.FailureDetails != nil { - fmt.Println(metadata.FailureDetails.GetErrorMessage()) - failed.ServeHTTP(w, req) - return - } - req.Body = io.NopCloser(bytes.NewBuffer(body)) fmt.Println("pom", pom) @@ -137,7 +130,7 @@ func withAuthorization(handler, failed http.Handler, spicedbClient v1.Permission requestInfo.APIGroup == "" && requestInfo.Name != "" { go func() { - cr, err := spicedbClient.CheckPermission(ctx, &v1.CheckPermissionRequest{ + cr, err := s.SpiceDBClient.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_MinimizeLatency{MinimizeLatency: true}, }, @@ -180,7 +173,7 @@ func withAuthorization(handler, failed http.Handler, spicedbClient v1.Permission requestInfo.APIGroup == "" { go func() { - lr, err := spicedbClient.LookupResources(ctx, &v1.LookupResourcesRequest{ + lr, err := s.SpiceDBClient.LookupResources(ctx, &v1.LookupResourcesRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_MinimizeLatency{MinimizeLatency: true}, }, @@ -253,7 +246,7 @@ func withAuthorization(handler, failed http.Handler, spicedbClient v1.Permission for _, u := range resp.Updates { if u.Operation == v1.RelationshipUpdate_OPERATION_TOUCH || u.Operation == v1.RelationshipUpdate_OPERATION_CREATE { // do a check - cr, err := spicedbClient.CheckPermission(ctx, &v1.CheckPermissionRequest{ + cr, err := s.SpiceDBClient.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_FullyConsistent{FullyConsistent: true}, // TODO @@ -613,7 +606,6 @@ func (d *AuthzData) FilterList(body []byte) ([]byte, error) { list.Items = allowedItems return json.Marshal(list) - } func (d *AuthzData) FilterObject(pom *metav1.PartialObjectMetadata, body []byte) ([]byte, error) { diff --git a/pkg/proxy/durable.go b/pkg/proxy/durable.go deleted file mode 100644 index d9df75a..0000000 --- a/pkg/proxy/durable.go +++ /dev/null @@ -1,182 +0,0 @@ -package proxy - -import ( - v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" - "github.com/microsoft/durabletask-go/task" - "github.com/pingcap/failpoint" - k8serrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apiserver/pkg/authentication/user" - "k8s.io/apiserver/pkg/endpoints/request" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/clientcmd" -) - -const WriteToSpiceDBAndKubeName = "WriteToSpiceDBAndKube" - -type CreateObjInput struct { - RequestInfo *request.RequestInfo - UserInfo *user.DefaultInfo - ObjectMeta *metav1.ObjectMeta - Body []byte -} - -type KubeReqInput struct { - RequestInfo *request.RequestInfo - ObjectMeta *metav1.ObjectMeta - Body []byte -} - -type KubeResp struct { - Body []byte - ContentType string - StatusCode int -} - -// WriteToSpiceDBAndKube ensures that a write exists in both SpiceDB and kube, -// or neither. -func (s *Server) WriteToSpiceDBAndKube(ctx *task.OrchestrationContext) (any, error) { - var input CreateObjInput - if err := ctx.GetInput(&input); err != nil { - return nil, err - } - - var resp v1.WriteRelationshipsResponse - if err := ctx.CallActivity(s.WriteToSpiceDB, task.WithActivityInput(&v1.WriteRelationshipsRequest{ - Updates: []*v1.RelationshipUpdate{{ - Operation: v1.RelationshipUpdate_OPERATION_CREATE, - Relationship: &v1.Relationship{ - Resource: &v1.ObjectReference{ - ObjectType: "namespace", - ObjectId: input.ObjectMeta.Name, - }, - Relation: "creator", - Subject: &v1.SubjectReference{ - Object: &v1.ObjectReference{ - ObjectType: "user", - ObjectId: input.UserInfo.GetName(), - }, - }, - }, - }}, - })).Await(&resp); err != nil { - return nil, err - } - - var out KubeResp - if err := ctx.CallActivity(s.WriteToKube, task.WithActivityInput(&KubeReqInput{ - RequestInfo: input.RequestInfo, - ObjectMeta: input.ObjectMeta, - Body: input.Body, - })).Await(&out); err != nil { - // if there's an error, might need to rollback the spicedb write - - // check if object exists - we might have failed the write task but - // succeeded in writing to kube - var exists bool - if err := ctx.CallActivity(s.CheckKube, task.WithActivityInput(&KubeReqInput{ - RequestInfo: input.RequestInfo, - ObjectMeta: input.ObjectMeta, - Body: input.Body, - })).Await(&exists); err != nil { - return nil, err - } - - // if the object doesn't exist, clean up the spicedb write - if !exists { - if err := ctx.CallActivity(s.WriteToSpiceDB, task.WithActivityInput(&v1.WriteRelationshipsRequest{ - Updates: []*v1.RelationshipUpdate{{ - Operation: v1.RelationshipUpdate_OPERATION_DELETE, - Relationship: &v1.Relationship{ - Resource: &v1.ObjectReference{ - ObjectType: "namespace", - ObjectId: input.ObjectMeta.Name, - }, - Relation: "creator", - Subject: &v1.SubjectReference{ - Object: &v1.ObjectReference{ - ObjectType: "user", - ObjectId: input.UserInfo.GetName(), - }, - }, - }, - }}, - })).Await(&resp); err != nil { - return nil, err - } - return nil, err - } - } - return out, nil -} - -func (s *Server) WriteToSpiceDB(ctx task.ActivityContext) (any, error) { - var req v1.WriteRelationshipsRequest - if err := ctx.GetInput(&req); err != nil { - return nil, err - } - return s.spiceDBClient.WriteRelationships(ctx.Context(), &req) -} - -func (s *Server) WriteToKube(ctx task.ActivityContext) (any, error) { - restConfig, err := clientcmd.NewDefaultClientConfig(*s.opts.BackendConfig, nil).ClientConfig() - if err != nil { - return nil, err - } - client, err := kubernetes.NewForConfig(restConfig) - if err != nil { - return nil, err - } - - var req KubeReqInput - if err := ctx.GetInput(&req); err != nil { - return nil, err - } - - failpoint.Inject("panicKubeWrite", func() { - panic("failed to write to kube") - }) - - kreq := client.RESTClient().Post(). - RequestURI(req.RequestInfo.Path). - Body(req.Body) - if len(req.RequestInfo.Namespace) > 0 { - kreq = kreq.Namespace(req.RequestInfo.Namespace) - } - res := kreq.Do(ctx.Context()) - - failpoint.Inject("panicKubeReadResp", func() { - panic("write succeeded, but crashed before the status could be read") - }) - - body, err := res.Raw() - if err != nil { - return nil, err - } - resp := KubeResp{ - Body: body, - } - res.StatusCode(&resp.StatusCode) - res.ContentType(&resp.ContentType) - return resp, nil -} - -func (s *Server) CheckKube(ctx task.ActivityContext) (any, error) { - restConfig, err := clientcmd.NewDefaultClientConfig(*s.opts.BackendConfig, nil).ClientConfig() - if err != nil { - return nil, err - } - client, err := kubernetes.NewForConfig(restConfig) - if err != nil { - return nil, err - } - - var req KubeReqInput - if err := ctx.GetInput(&req); err != nil { - return nil, err - } - - // TODO: this is somewhat janky - res := client.RESTClient().Get().RequestURI(req.RequestInfo.Path + "/" + req.ObjectMeta.GetName()).Do(ctx.Context()) - return !k8serrors.IsNotFound(res.Error()), nil -} diff --git a/pkg/proxy/durable_activities.go b/pkg/proxy/durable_activities.go new file mode 100644 index 0000000..8637505 --- /dev/null +++ b/pkg/proxy/durable_activities.go @@ -0,0 +1,62 @@ +package proxy + +import ( + v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" + "github.com/microsoft/durabletask-go/task" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + + "github.com/authzed/spicedb-kubeapi-proxy/pkg/failpoints" +) + +// WriteToSpiceDBActivity writes relationships to spicedb and returns any errors. +func (s *Server) WriteToSpiceDB(ctx task.ActivityContext) (any, error) { + var req v1.WriteRelationshipsRequest + if err := ctx.GetInput(&req); err != nil { + return nil, err + } + failpoints.FailPoint("panicWriteSpiceDB") + out, err := s.SpiceDBClient.WriteRelationships(ctx.Context(), &req) + failpoints.FailPoint("panicSpiceDBReadResp") + return out, err +} + +// WriteToKubeActivity +func (s *Server) WriteToKube(ctx task.ActivityContext) (any, error) { + var req KubeReqInput + if err := ctx.GetInput(&req); err != nil { + return nil, err + } + + failpoints.FailPoint("panicKubeWrite") + + kreq := s.KubeClient.RESTClient().Post(). + RequestURI(req.RequestInfo.Path). + Body(req.Body) + if len(req.RequestInfo.Namespace) > 0 { + kreq = kreq.Namespace(req.RequestInfo.Namespace) + } + res := kreq.Do(ctx.Context()) + + failpoints.FailPoint("panicKubeReadResp") + + resp := KubeResp{} + body, err := res.Raw() + if kerr, ok := err.(*k8serrors.StatusError); ok { + resp.Err = *kerr + } + resp.Body = body + res.StatusCode(&resp.StatusCode) + res.ContentType(&resp.ContentType) + return resp, nil +} + +func (s *Server) CheckKube(ctx task.ActivityContext) (any, error) { + var req KubeReqInput + if err := ctx.GetInput(&req); err != nil { + return nil, err + } + + // TODO: this is somewhat janky + res := s.KubeClient.RESTClient().Get().RequestURI(req.RequestInfo.Path + "/" + req.ObjectMeta.GetName()).Do(ctx.Context()) + return !k8serrors.IsNotFound(res.Error()), nil +} diff --git a/pkg/proxy/durable_workflows.go b/pkg/proxy/durable_workflows.go new file mode 100644 index 0000000..9cb4e21 --- /dev/null +++ b/pkg/proxy/durable_workflows.go @@ -0,0 +1,267 @@ +package proxy + +import ( + "fmt" + "net/http" + + v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" + "github.com/cespare/xxhash/v2" + "github.com/microsoft/durabletask-go/task" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/json" + "k8s.io/apiserver/pkg/authentication/user" + "k8s.io/apiserver/pkg/endpoints/request" +) + +const ( + lockResourceType = "lock" + lockRelationName = "workflow" + workflowResourceType = "workflow" + MaxKubeAttempts = 5 + OptimisticWriteToSpiceDBAndKube = "OptimisticWriteToSpiceDBAndKube" + LockingWriteToSpiceDBAndKube = "LockingWriteToSpiceDBAndKube" + WriteToSpiceDBActivity = "WriteToSpiceDBActivity" + WriteToKubeActivity = "WriteToKubeActivity" + CheckKubeActivity = "CheckKubeActivity" +) + +type CreateObjInput struct { + RequestInfo *request.RequestInfo + UserInfo *user.DefaultInfo + ObjectMeta *metav1.ObjectMeta + Body []byte +} + +type KubeReqInput struct { + RequestInfo *request.RequestInfo + ObjectMeta *metav1.ObjectMeta + Body []byte +} + +type KubeResp struct { + Body []byte + ContentType string + StatusCode int + Err k8serrors.StatusError +} + +type RollbackRelationships []*v1.Relationship + +func NewRollbackRelationships(rels ...*v1.Relationship) *RollbackRelationships { + r := RollbackRelationships(rels) + return &r +} + +func (r *RollbackRelationships) WithRel(relationship *v1.Relationship) *RollbackRelationships { + *r = append(*r, relationship) + return r +} + +func (r *RollbackRelationships) Cleanup(ctx *task.OrchestrationContext) { + updates := make([]*v1.RelationshipUpdate, 0, len(*r)) + for _, rel := range *r { + rel := rel + updates = append(updates, &v1.RelationshipUpdate{ + Operation: v1.RelationshipUpdate_OPERATION_DELETE, + Relationship: rel, + }) + } + // Should this be a separate workflow? + for { + var delResp v1.WriteRelationshipsResponse + if err := ctx.CallActivity(WriteToSpiceDBActivity, task.WithActivityInput(&v1.WriteRelationshipsRequest{ + Updates: updates, + })).Await(&delResp); err != nil { + fmt.Println("error deleting lock tuple", err) + continue + } + // no error, delete succeeded, exit loop + break + } +} + +// LockingWriteToSpiceDBAndKube ensures that a write exists in both SpiceDB +// and kube, or neither, using locks. It prevents multiple users from writing +// the same object/fields at the same time +func (s *Server) LockingWriteToSpiceDBAndKube(ctx *task.OrchestrationContext) (any, error) { + var input CreateObjInput + if err := ctx.GetInput(&input); err != nil { + return nil, err + } + + // this is hardcoded for namespaces for now; should be configurable and + // come from the workflow input + spiceDBRelationship := SpiceDBNamespaceRel(input) + lockTuple := LockRel(input, string(ctx.ID)) + + // tuples to remove when the workflow is complete. + // in some cases we will roll back the input, in all cases we remove + // the lock when complete. + rollback := NewRollbackRelationships(lockTuple) + + var resp v1.WriteRelationshipsResponse + if err := ctx.CallActivity(WriteToSpiceDBActivity, task.WithActivityInput(&v1.WriteRelationshipsRequest{ + OptionalPreconditions: []*v1.Precondition{{ + Operation: v1.Precondition_OPERATION_MUST_NOT_MATCH, + Filter: &v1.RelationshipFilter{ + ResourceType: lockResourceType, + OptionalResourceId: lockTuple.Resource.ObjectId, + OptionalRelation: lockRelationName, + OptionalSubjectFilter: &v1.SubjectFilter{SubjectType: workflowResourceType}, + }, + }}, + Updates: []*v1.RelationshipUpdate{{ + Operation: v1.RelationshipUpdate_OPERATION_TOUCH, + Relationship: spiceDBRelationship, + }, { + Operation: v1.RelationshipUpdate_OPERATION_TOUCH, + Relationship: lockTuple, + }}, + })).Await(&resp); err != nil { + // request failed for some reason + fmt.Println("spicedb write failed", err) + + rollback.WithRel(spiceDBRelationship).Cleanup(ctx) + + // if the spicedb write fails, report it as a kube conflict error + // we return this for any error, not just lock conflicts, so that the + // user will attempt to retry instead of the workflow (nothing from the + // workflow has succeeded, so there's not much use in retrying automatically). + return KubeConflict(err, input), nil + } + + for i := 0; i <= MaxKubeAttempts; i++ { + // Attempt to write to kube + var out KubeResp + if err := ctx.CallActivity(WriteToKubeActivity, task.WithActivityInput(&KubeReqInput{ + RequestInfo: input.RequestInfo, + ObjectMeta: input.ObjectMeta, + Body: input.Body, + })).Await(&out); err != nil { + // didn't get a response from kube, try again + fmt.Println("kube write failed", err, out) + continue + } + + if out.StatusCode == http.StatusConflict || out.StatusCode == http.StatusCreated { + rollback.Cleanup(ctx) + return out, nil + } + + // some other status code is some other type of error, remove + // the original tuple and the lock tuple + rollback.WithRel(spiceDBRelationship).Cleanup(ctx) + return out, nil + } + rollback.WithRel(spiceDBRelationship).Cleanup(ctx) + return nil, fmt.Errorf("failed to communicate with kubernetes after %d attempts", MaxKubeAttempts) +} + +// OptimisticWriteToSpiceDBAndKube ensures that a write exists in both SpiceDB and kube, +// or neither. It attempts to perform the writes and rolls back if errors are +// encountered, leaving the user to retry on write conflicts. +func (s *Server) OptimisticWriteToSpiceDBAndKube(ctx *task.OrchestrationContext) (any, error) { + var input CreateObjInput + if err := ctx.GetInput(&input); err != nil { + return nil, err + } + + // this is hardcoded for namespaces for now; should be configurable and + // come from the workflow input + spiceDBRelationship := SpiceDBNamespaceRel(input) + rollback := NewRollbackRelationships(spiceDBRelationship) + + var resp v1.WriteRelationshipsResponse + if err := ctx.CallActivity(WriteToSpiceDBActivity, task.WithActivityInput(&v1.WriteRelationshipsRequest{ + Updates: []*v1.RelationshipUpdate{{ + Operation: v1.RelationshipUpdate_OPERATION_CREATE, + Relationship: spiceDBRelationship, + }}, + })).Await(&resp); err != nil { + rollback.Cleanup(ctx) + // report spicedb write errors as conflicts + return KubeConflict(err, input), nil + } + + var out KubeResp + if err := ctx.CallActivity(WriteToKubeActivity, task.WithActivityInput(&KubeReqInput{ + RequestInfo: input.RequestInfo, + ObjectMeta: input.ObjectMeta, + Body: input.Body, + })).Await(&out); err != nil { + // if there's an error, might need to roll back the spicedb write + + // check if object exists - we might have failed the write task but + // succeeded in writing to kube + var exists bool + if err := ctx.CallActivity(CheckKubeActivity, task.WithActivityInput(&KubeReqInput{ + RequestInfo: input.RequestInfo, + ObjectMeta: input.ObjectMeta, + Body: input.Body, + })).Await(&exists); err != nil { + return nil, err + } + + // if the object doesn't exist, clean up the spicedb write + if !exists { + rollback.Cleanup(ctx) + return nil, err + } + } + return out, nil +} + +// LockRel generates a relationship representing a worfklow's lock over a +// specific resource in kube. +func LockRel(input CreateObjInput, id string) *v1.Relationship { + lockKey := input.RequestInfo.Path + "/" + input.ObjectMeta.GetName() + "/" + input.RequestInfo.Verb + lockHash := fmt.Sprintf("%x", xxhash.Sum64String(lockKey)) + return &v1.Relationship{ + Resource: &v1.ObjectReference{ + ObjectType: lockResourceType, + ObjectId: lockHash, + }, + Relation: lockRelationName, + Subject: &v1.SubjectReference{ + Object: &v1.ObjectReference{ + ObjectType: workflowResourceType, + ObjectId: id, + }, + }, + } +} + +// SpiceDBNamespaceRel returns a tuple to write when creating a namespace. +// This is hardcoded for namespaces for now; should be configurable and +// come from the workflow input +func SpiceDBNamespaceRel(input CreateObjInput) *v1.Relationship { + return &v1.Relationship{ + Resource: &v1.ObjectReference{ + ObjectType: "namespace", + ObjectId: input.ObjectMeta.Name, + }, + Relation: "creator", + Subject: &v1.SubjectReference{ + Object: &v1.ObjectReference{ + ObjectType: "user", + ObjectId: input.UserInfo.GetName(), + }, + }, + } +} + +// KubeConflict wraps an error and turns it into a standard kube conflict +// response. +func KubeConflict(err error, input CreateObjInput) KubeResp { + var out KubeResp + statusError := k8serrors.NewConflict(schema.GroupResource{ + Group: input.RequestInfo.APIGroup, + Resource: input.RequestInfo.Resource, + }, input.ObjectMeta.Name, err) + out.StatusCode = http.StatusConflict + out.Err = *statusError + out.Body, _ = json.Marshal(statusError) + return out +} diff --git a/pkg/proxy/server.go b/pkg/proxy/server.go index 488a81f..19321c8 100644 --- a/pkg/proxy/server.go +++ b/pkg/proxy/server.go @@ -25,6 +25,8 @@ import ( "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/server" genericfilters "k8s.io/apiserver/pkg/server/filters" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" "k8s.io/klog/v2" ) @@ -32,8 +34,14 @@ import ( type Server struct { opts Options Handler http.Handler - taskHubWorker backend.TaskHubWorker - spiceDBClient v1.PermissionsServiceClient + TaskHubWorker backend.TaskHubWorker + SpiceDBClient v1.PermissionsServiceClient + KubeClient *kubernetes.Clientset + + // LockMode references the name of the workflow to run for dual writes + // This is very temporary, and should be replaced with per-request + // configuration. + LockMode string } func NewServer(ctx context.Context, o Options) (*Server, error) { @@ -43,31 +51,43 @@ func NewServer(ctx context.Context, o Options) (*Server, error) { if err != nil { return nil, err } - s.spiceDBClient = v1.NewPermissionsServiceClient(conn) + s.SpiceDBClient = v1.NewPermissionsServiceClient(conn) watchClient := v1.NewWatchServiceClient(conn) + restConfig, err := clientcmd.NewDefaultClientConfig(*s.opts.BackendConfig, nil).ClientConfig() + if err != nil { + return nil, err + } + s.KubeClient, err = kubernetes.NewForConfig(restConfig) + if err != nil { + return nil, err + } + // durabletask - stores planned dual writes in a sqlite db logger := backend.DefaultLogger() r := task.NewTaskRegistry() - if err := r.AddOrchestratorN(WriteToSpiceDBAndKubeName, s.WriteToSpiceDBAndKube); err != nil { + if err := r.AddOrchestratorN(LockingWriteToSpiceDBAndKube, s.LockingWriteToSpiceDBAndKube); err != nil { + return nil, err + } + if err := r.AddOrchestratorN(OptimisticWriteToSpiceDBAndKube, s.OptimisticWriteToSpiceDBAndKube); err != nil { return nil, err } - if err := r.AddActivity(s.WriteToSpiceDB); err != nil { + if err := r.AddActivityN(WriteToSpiceDBActivity, s.WriteToSpiceDB); err != nil { return nil, err } - if err := r.AddActivity(s.WriteToKube); err != nil { + if err := r.AddActivityN(WriteToKubeActivity, s.WriteToKube); err != nil { return nil, err } - if err := r.AddActivity(s.CheckKube); err != nil { + if err := r.AddActivityN(CheckKubeActivity, s.CheckKube); err != nil { return nil, err } // note: can use the in-memory sqlite provider by specifying "" - be := sqlite.NewSqliteBackend(sqlite.NewSqliteOptions("dtx.sqlite"), logger) + be := sqlite.NewSqliteBackend(sqlite.NewSqliteOptions(""), logger) executor := task.NewTaskExecutor(r) orchestrationWorker := backend.NewOrchestrationWorker(be, executor, logger) activityWorker := backend.NewActivityTaskWorker(be, executor, logger) - s.taskHubWorker = backend.NewTaskHubWorker(be, orchestrationWorker, activityWorker, logger) + s.TaskHubWorker = backend.NewTaskHubWorker(be, orchestrationWorker, activityWorker, logger) taskHubClient := backend.NewTaskHubClient(be) mux := http.NewServeMux() @@ -122,7 +142,7 @@ func NewServer(ctx context.Context, o Options) (*Server, error) { codecs := serializer.NewCodecFactory(scheme) failHandler := genericapifilters.Unauthorized(codecs) - handler := withAuthorization(clusterProxy, failHandler, s.spiceDBClient, watchClient, taskHubClient) + handler := s.WithAuthorization(clusterProxy, failHandler, watchClient, taskHubClient) handler = withAuthentication(handler, failHandler, s.opts.AuthenticationInfo.Authenticator) handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver) handler = genericfilters.WithHTTPLogging(handler) @@ -147,7 +167,7 @@ func (s *Server) Run(ctx context.Context) error { } }() go func() { - if err := s.taskHubWorker.Start(ctx); err != nil { + if err := s.TaskHubWorker.Start(ctx); err != nil { klog.FromContext(ctx).Error(err, "failed to run durable task worker") cancel() } @@ -158,7 +178,7 @@ func (s *Server) Run(ctx context.Context) error { } <-doneCh - if err := s.taskHubWorker.Shutdown(context.Background()); err != nil { + if err := s.TaskHubWorker.Shutdown(context.Background()); err != nil { return err } return nil diff --git a/pkg/spicedb/bootstrap.yaml b/pkg/spicedb/bootstrap.yaml index fcc1deb..bbdb21d 100644 --- a/pkg/spicedb/bootstrap.yaml +++ b/pkg/spicedb/bootstrap.yaml @@ -6,5 +6,9 @@ schema: |- relation viewer: user permission view = viewer + creator } + definition lock { + relation workflow: workflow + } + definition workflow {} relationships: | namespace:kube-system#viewer@user:rakis