Skip to content

Commit

Permalink
decouple distributed transaction handling from durable task implement…
Browse files Browse the repository at this point in the history
…ation

with the goal of being able to exchange the underlying implementation
  • Loading branch information
vroldanbet committed Sep 6, 2023
1 parent 992df9b commit 10d4b30
Show file tree
Hide file tree
Showing 10 changed files with 361 additions and 124 deletions.
3 changes: 2 additions & 1 deletion e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"sigs.k8s.io/controller-runtime/tools/setup-envtest/workflows"

"github.com/authzed/spicedb-kubeapi-proxy/pkg/proxy"
"github.com/authzed/spicedb-kubeapi-proxy/pkg/proxy/distributedtx"
)

var (
Expand Down Expand Up @@ -113,7 +114,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
Expect(err).To(Succeed())

// speed up backoff for tests
proxy.KubeBackoff.Duration = 1 * time.Microsecond
distributedtx.KubeBackoff.Duration = 1 * time.Microsecond

ctx, cancel := context.WithCancel(context.Background())
DeferCleanup(cancel)
Expand Down
10 changes: 5 additions & 5 deletions e2e/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"k8s.io/client-go/tools/clientcmd"

"github.com/authzed/spicedb-kubeapi-proxy/pkg/failpoints"
"github.com/authzed/spicedb-kubeapi-proxy/pkg/proxy"
"github.com/authzed/spicedb-kubeapi-proxy/pkg/proxy/distributedtx"
)

var _ = Describe("Proxy", func() {
Expand Down Expand Up @@ -119,9 +119,9 @@ var _ = Describe("Proxy", func() {

// make kube write fail for chani's namespace, spicedb write will have
// succeeded
if proxySrv.LockMode == proxy.PessimisticWriteToSpiceDBAndKube {
if proxySrv.LockMode == distributedtx.StrategyPessimisticWriteToSpiceDBAndKube {
// the locking version retries if the connection fails
failpoints.EnableFailPoint("panicKubeWrite", proxy.MaxKubeAttempts+1)
failpoints.EnableFailPoint("panicKubeWrite", distributedtx.MaxKubeAttempts+1)
} else {
failpoints.EnableFailPoint("panicKubeWrite", 1)
}
Expand Down Expand Up @@ -279,14 +279,14 @@ var _ = Describe("Proxy", func() {

When("optimistic locking is used", func() {
BeforeEach(func() {
proxySrv.LockMode = proxy.OptimisticWriteToSpiceDBAndKube
proxySrv.LockMode = distributedtx.StrategyOptimisticWriteToSpiceDBAndKube
})
AssertDualWriteBehavior()
})

When("pessimistic locking is used", func() {
BeforeEach(func() {
proxySrv.LockMode = proxy.PessimisticWriteToSpiceDBAndKube
proxySrv.LockMode = distributedtx.StrategyPessimisticWriteToSpiceDBAndKube
})
AssertDualWriteBehavior()
})
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ go 1.20

require (
github.com/authzed/authzed-go v0.9.1-0.20230810180432-2fb0fd4c66dd
github.com/authzed/grpcutil v0.0.0-20230703173955-bdd0ac3f16a5
github.com/authzed/spicedb v1.24.1-0.20230821163419-e4bb3adfd50b
github.com/cespare/xxhash/v2 v2.2.0
github.com/dustin/go-humanize v1.0.1
github.com/google/uuid v1.3.0
github.com/microsoft/durabletask-go v0.3.0
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.6-0.20210604193023-d5e0c0615ace
github.com/stretchr/testify v1.8.4
google.golang.org/grpc v1.56.2
k8s.io/apimachinery v0.28.0
k8s.io/apiserver v0.28.0
Expand Down Expand Up @@ -47,7 +50,6 @@ require (
github.com/IBM/pgxpoolprometheus v1.1.1 // indirect
github.com/Masterminds/squirrel v1.5.4 // indirect
github.com/NYTimes/gziphandler v1.1.1 // indirect
github.com/authzed/grpcutil v0.0.0-20230703173955-bdd0ac3f16a5 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
Expand Down Expand Up @@ -91,7 +93,6 @@ require (
github.com/google/go-github/v43 v43.0.0 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.5 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
Expand Down Expand Up @@ -159,7 +160,6 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/viper v1.16.0 // indirect
github.com/stoewer/go-strcase v1.3.0 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
go.etcd.io/etcd/api/v3 v3.5.9 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.9 // indirect
Expand Down
27 changes: 13 additions & 14 deletions pkg/proxy/authz.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,26 @@ import (
"net/http"
"sync"

"github.com/microsoft/durabletask-go/api"
"github.com/microsoft/durabletask-go/backend"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/authentication/user"
"github.com/authzed/spicedb-kubeapi-proxy/pkg/proxy/distributedtx"

v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
)

func WithAuthorization(handler, failed http.Handler, permissionsClient v1.PermissionsServiceClient, watchClient v1.WatchServiceClient, taskHubClient backend.TaskHubClient, lockMode *string) (http.Handler, error) {
func WithAuthorization(handler, failed http.Handler, permissionsClient v1.PermissionsServiceClient, watchClient v1.WatchServiceClient, scheduler distributedtx.TaskScheduler, lockMode *string) (http.Handler, error) {
if *lockMode == "" {
return nil, fmt.Errorf("lock mode is undefined")
}

if !(*lockMode == PessimisticWriteToSpiceDBAndKube || *lockMode == OptimisticWriteToSpiceDBAndKube) {
if !(*lockMode == distributedtx.StrategyPessimisticWriteToSpiceDBAndKube || *lockMode == distributedtx.StrategyOptimisticWriteToSpiceDBAndKube) {
return nil, fmt.Errorf("unexpected lock mode: %s", *lockMode)
}

Expand Down Expand Up @@ -75,18 +74,18 @@ func WithAuthorization(handler, failed http.Handler, permissionsClient v1.Permis
return
}

id, err := taskHubClient.ScheduleNewOrchestration(ctx, *lockMode, api.WithInput(CreateObjInput{
id, err := scheduler.Schedule(ctx, *lockMode, &distributedtx.CreateObjInput{
RequestInfo: requestInfo,
UserInfo: userInfo.(*user.DefaultInfo),
ObjectMeta: &pom.ObjectMeta,
Body: body,
}))
})
if err != nil {
fmt.Println(err)
failed.ServeHTTP(w, req)
return
}
metadata, err := taskHubClient.WaitForOrchestrationCompletion(ctx, id)
metadata, err := scheduler.WaitForCompletion(ctx, id)
if err != nil {
fmt.Println(err)
failed.ServeHTTP(w, req)
Expand All @@ -103,8 +102,8 @@ func WithAuthorization(handler, failed http.Handler, permissionsClient v1.Permis
}))
close(allowed)

var resp KubeResp
if err := json.Unmarshal([]byte(metadata.SerializedOutput), &resp); err != nil {
var resp distributedtx.KubeResp
if err := json.Unmarshal(metadata.SerializedOutput, &resp); err != nil {
fmt.Println(err)
failed.ServeHTTP(w, req)
return
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,75 @@
package proxy
package distributedtx

import (
"context"

v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
"github.com/microsoft/durabletask-go/task"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/client-go/rest"

"github.com/authzed/spicedb-kubeapi-proxy/pkg/failpoints"
)

type ExecutionContext interface {
GetInput(resultPtr any) error
Context() context.Context
}

type IdentifiableExecutionContext interface {
ExecutionContext
ID() string
}

type CreateObjInput struct {
RequestInfo *request.RequestInfo
UserInfo *user.DefaultInfo
ObjectMeta *metav1.ObjectMeta
Body []byte
}

type KubeReqInput struct {
RequestInfo *request.RequestInfo
ObjectMeta *metav1.ObjectMeta
Body []byte
}

type KubeResp struct {
Body []byte
ContentType string
StatusCode int
Err k8serrors.StatusError
}

type Handler struct {
PermissionClient v1.PermissionsServiceClient
KubeClient rest.Interface
}

// WriteToSpiceDB writes relationships to spicedb and returns any errors.
func (s *Server) WriteToSpiceDB(ctx task.ActivityContext) (any, error) {
func (h *Handler) WriteToSpiceDB(ctx ExecutionContext) (any, error) {
var req v1.WriteRelationshipsRequest
if err := ctx.GetInput(&req); err != nil {
return nil, err
}
failpoints.FailPoint("panicWriteSpiceDB")
out, err := s.PermissionClient().WriteRelationships(ctx.Context(), &req)
out, err := h.PermissionClient.WriteRelationships(ctx.Context(), &req)
failpoints.FailPoint("panicSpiceDBReadResp")
return out, err
}

// WriteToKube peforms a Kube API Server POST, specified in a KubeReqInput propagated via the task.ActivityContext arg
func (s *Server) WriteToKube(ctx task.ActivityContext) (any, error) {
func (h *Handler) WriteToKube(ctx ExecutionContext) (any, error) {
var req KubeReqInput
if err := ctx.GetInput(&req); err != nil {
return nil, err
}

failpoints.FailPoint("panicKubeWrite")

kreq := s.KubeClient.RESTClient().Post().
kreq := h.KubeClient.Post().
RequestURI(req.RequestInfo.Path).
Body(req.Body)
if len(req.RequestInfo.Namespace) > 0 {
Expand All @@ -50,13 +90,13 @@ func (s *Server) WriteToKube(ctx task.ActivityContext) (any, error) {
return resp, nil
}

func (s *Server) CheckKube(ctx task.ActivityContext) (any, error) {
func (h *Handler) CheckKube(ctx ExecutionContext) (any, error) {
var req KubeReqInput
if err := ctx.GetInput(&req); err != nil {
return nil, err
}

// TODO: this is somewhat janky
res := s.KubeClient.RESTClient().Get().RequestURI(req.RequestInfo.Path + "/" + req.ObjectMeta.GetName()).Do(ctx.Context())
res := h.KubeClient.Get().RequestURI(req.RequestInfo.Path + "/" + req.ObjectMeta.GetName()).Do(ctx.Context())
return !k8serrors.IsNotFound(res.Error()), nil
}
Loading

0 comments on commit 10d4b30

Please sign in to comment.