From 598788b4b096d34b13db80b959df8b875c867e3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADctor=20Rold=C3=A1n=20Betancort?= Date: Fri, 15 Sep 2023 01:49:10 -0400 Subject: [PATCH] implement dual-write workflows with go-workflows --- e2e/go.mod | 19 +- e2e/go.sum | 40 +--- e2e/proxy_test.go | 45 ++--- go.mod | 19 +- go.sum | 40 +--- pkg/proxy/authz.go | 26 ++- pkg/proxy/distributedtx/go_workflows.go | 70 +++++++ pkg/proxy/distributedtx/tasks.go | 69 ++++--- pkg/proxy/distributedtx/workflow_test.go | 103 ++++++++++ pkg/proxy/distributedtx/workflows.go | 223 ++++++++++------------ pkg/proxy/durabletask/durabletask.go | 131 ------------- pkg/proxy/durabletask/durabletask_test.go | 89 --------- pkg/proxy/options.go | 13 +- pkg/proxy/server.go | 29 ++- 14 files changed, 390 insertions(+), 526 deletions(-) create mode 100644 pkg/proxy/distributedtx/go_workflows.go create mode 100644 pkg/proxy/distributedtx/workflow_test.go delete mode 100644 pkg/proxy/durabletask/durabletask.go delete mode 100644 pkg/proxy/durabletask/durabletask_test.go diff --git a/e2e/go.mod b/e2e/go.mod index 3bb3e69..0077cf5 100644 --- a/e2e/go.mod +++ b/e2e/go.mod @@ -53,6 +53,7 @@ require ( github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/creasty/defaults v1.7.0 // indirect + github.com/cschleiden/go-workflows v0.16.2 // indirect github.com/dalzilio/rudd v1.1.1-0.20230806153452-9e08a6ea8170 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dlmiddlecote/sqlstats v1.0.2 // indirect @@ -69,6 +70,7 @@ require ( github.com/fatih/color v1.15.0 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect + github.com/go-errors/errors v1.4.2 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-logr/zerologr v1.2.3 // indirect @@ -111,23 +113,22 @@ require ( github.com/jackc/pgx-zerolog v0.0.0-20230315001418-f978528409eb // indirect github.com/jackc/pgx/v5 v5.4.2 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/jellydator/ttlcache/v3 v3.0.0 // indirect github.com/joho/godotenv v1.5.1 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/jzelinskie/cobrautil/v2 v2.0.0-20230714172849-80717639cec5 // indirect github.com/jzelinskie/stringz v0.0.2 // indirect - github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/klauspost/compress v1.16.7 // indirect github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect github.com/lthibault/jitterbug v2.0.0+incompatible // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect - github.com/marusama/semaphore/v2 v2.5.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect + github.com/mattn/go-sqlite3 v1.14.16 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect - github.com/microsoft/durabletask-go v0.3.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/moby/sys/mountinfo v0.6.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect @@ -147,7 +148,6 @@ require ( github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.11.0 // indirect - github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rs/cors v1.9.0 // indirect github.com/rs/zerolog v1.29.1 // indirect github.com/scylladb/go-set v1.0.2 // indirect @@ -159,6 +159,7 @@ require ( github.com/spf13/pflag v1.0.6-0.20210604193023-d5e0c0615ace // indirect github.com/spf13/viper v1.16.0 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect + github.com/stretchr/objx v0.5.0 // indirect github.com/stretchr/testify v1.8.4 // indirect github.com/subosito/gotenv v1.4.2 // indirect go.etcd.io/etcd/api/v3 v3.5.9 // indirect @@ -221,16 +222,6 @@ require ( k8s.io/mount-utils v0.0.0 // indirect k8s.io/pod-security-admission v0.0.0 // indirect k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 // indirect - lukechampine.com/uint128 v1.2.0 // indirect - modernc.org/cc/v3 v3.40.0 // indirect - modernc.org/ccgo/v3 v3.16.13 // indirect - modernc.org/libc v1.22.5 // indirect - modernc.org/mathutil v1.5.0 // indirect - modernc.org/memory v1.5.0 // indirect - modernc.org/opt v0.1.3 // indirect - modernc.org/sqlite v1.22.1 // indirect - modernc.org/strutil v1.1.3 // indirect - modernc.org/token v1.0.1 // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.1.2 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect diff --git a/e2e/go.sum b/e2e/go.sum index aee69f0..215a13a 100644 --- a/e2e/go.sum +++ b/e2e/go.sum @@ -129,6 +129,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creasty/defaults v1.7.0 h1:eNdqZvc5B509z18lD8yc212CAqJNvfT1Jq6L8WowdBA= github.com/creasty/defaults v1.7.0/go.mod h1:iGzKe6pbEHnpMPtfDXZEr0NVxWnPTjb1bbDy08fPzYM= +github.com/cschleiden/go-workflows v0.16.2 h1:rRKAJ/HdMFUAcZoJYJ/o7kH7Yvn9Zs3zKtSSzVQUVkk= +github.com/cschleiden/go-workflows v0.16.2/go.mod h1:uH7DoncdkBSk5U/azOSf6Yn32Or5KlSv15RoMQze4i0= github.com/dalzilio/rudd v1.1.1-0.20230806153452-9e08a6ea8170 h1:bHEN1z3EOO/IXHTQ8ZcmGoW4gTJt+mSrH2Sd458uo0E= github.com/dalzilio/rudd v1.1.1-0.20230806153452-9e08a6ea8170/go.mod h1:IxPC4Bdi3WqUwyGBMgLrWWGx67aRtUAZmOZrkIr7qaM= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -180,6 +182,8 @@ github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0X github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= +github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -356,6 +360,8 @@ github.com/jackc/pgx/v5 v5.4.2 h1:u1gmGDwbdRUZiwisBm/Ky2M14uQyUP65bG8+20nnyrg= github.com/jackc/pgx/v5 v5.4.2/go.mod h1:q6iHT8uDNXWiFNOlRqJzBTaSH3+2xCXkokxHZC5qWFY= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jellydator/ttlcache/v3 v3.0.0 h1:zmFhqrB/4sKiEiJHhtseJsNRE32IMVmJSs4++4gaQO4= +github.com/jellydator/ttlcache/v3 v3.0.0/go.mod h1:WwTaEmcXQ3MTjOm4bsZoDFiCu/hMvNWLO1w67RXz6h4= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= @@ -373,8 +379,6 @@ github.com/jzelinskie/cobrautil/v2 v2.0.0-20230714172849-80717639cec5 h1:X7vo4fE github.com/jzelinskie/cobrautil/v2 v2.0.0-20230714172849-80717639cec5/go.mod h1:954benQgK9Oi403yRaIot4TgRM0dDLKrBj48K7J8NZg= github.com/jzelinskie/stringz v0.0.2 h1:OSjMEYvz8tjhovgZ/6cGcPID736ubeukr35mu6RYAmg= github.com/jzelinskie/stringz v0.0.2/go.mod h1:hHYbgxJuNLRw91CmpuFsYEOyQqpDVFg8pvEh23vy4P0= -github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= -github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= @@ -399,8 +403,6 @@ github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0V github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= -github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM= -github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ= github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= @@ -410,11 +412,10 @@ github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APP github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y= +github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= -github.com/microsoft/durabletask-go v0.3.0 h1:n3DE/SwI9sebft3vG0QcD7ioxqy15VJMzv3HOry8dJk= -github.com/microsoft/durabletask-go v0.3.0/go.mod h1:t3u0iRvIadT1y4MD5cUG0mbTOqgANT6IFcLogv7o0M0= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78= @@ -481,9 +482,6 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.11.0 h1:5EAgkfkMl659uZPbe9AS2N68a7Cc1TJbPEuGzFuRbyk= github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= -github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= -github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= -github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= @@ -1042,30 +1040,6 @@ k8s.io/pod-security-admission v0.28.0 h1:Vz8XTjMAKHQFZv9Q4GdmO59CUtelkPPDRJTy/WT k8s.io/pod-security-admission v0.28.0/go.mod h1:hABVUcP7SRALDvESOK+RYIAWc9uZ5I1eSdcUwsOYTU8= k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 h1:qY1Ad8PODbnymg2pRbkyMT/ylpTrCM8P2RJ0yroCyIk= k8s.io/utils v0.0.0-20230406110748-d93618cff8a2/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -lukechampine.com/uint128 v1.2.0 h1:mBi/5l91vocEN8otkC5bDLhi2KdCticRiwbdB0O+rjI= -lukechampine.com/uint128 v1.2.0/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk= -modernc.org/cc/v3 v3.40.0 h1:P3g79IUS/93SYhtoeaHW+kRCIrYaxJ27MFPv+7kaTOw= -modernc.org/cc/v3 v3.40.0/go.mod h1:/bTg4dnWkSXowUO6ssQKnOV0yMVxDYNIsIrzqTFDGH0= -modernc.org/ccgo/v3 v3.16.13 h1:Mkgdzl46i5F/CNR/Kj80Ri59hC8TKAhZrYSaqvkwzUw= -modernc.org/ccgo/v3 v3.16.13/go.mod h1:2Quk+5YgpImhPjv2Qsob1DnZ/4som1lJTodubIcoUkY= -modernc.org/ccorpus v1.11.6 h1:J16RXiiqiCgua6+ZvQot4yUuUy8zxgqbqEEUuGPlISk= -modernc.org/httpfs v1.0.6 h1:AAgIpFZRXuYnkjftxTAZwMIiwEqAfk8aVB2/oA6nAeM= -modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE= -modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY= -modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ= -modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= -modernc.org/memory v1.5.0 h1:N+/8c5rE6EqugZwHii4IFsaJ7MUhoWX07J5tC/iI5Ds= -modernc.org/memory v1.5.0/go.mod h1:PkUhL0Mugw21sHPeskwZW4D6VscE/GQJOnIpCnW6pSU= -modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4= -modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0= -modernc.org/sqlite v1.22.1 h1:P2+Dhp5FR1RlVRkQ3dDfCiv3Ok8XPxqpe70IjYVA9oE= -modernc.org/sqlite v1.22.1/go.mod h1:OrDj17Mggn6MhE+iPbBNf7RGKODDE9NFT0f3EwDzJqk= -modernc.org/strutil v1.1.3 h1:fNMm+oJklMGYfU9Ylcywl0CO5O6nTfaowNsh2wpPjzY= -modernc.org/strutil v1.1.3/go.mod h1:MEHNA7PdEnEwLvspRMtWTNnp2nnyvMfkimT1NKNAGbw= -modernc.org/tcl v1.15.2 h1:C4ybAYCGJw968e+Me18oW55kD/FexcHbqH2xak1ROSY= -modernc.org/token v1.0.1 h1:A3qvTqOwexpfZZeyI0FeGPDlSWX5pjZu9hF4lU+EKWg= -modernc.org/token v1.0.1/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= -modernc.org/z v1.7.3 h1:zDJf6iHjrnB+WRD88stbXokugjyc0/pB91ri1gO6LZY= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/e2e/proxy_test.go b/e2e/proxy_test.go index 87d14ca..ec0e4af 100644 --- a/e2e/proxy_test.go +++ b/e2e/proxy_test.go @@ -113,7 +113,7 @@ var _ = Describe("Proxy", func() { Expect(chaniList).To(ContainElement(chaniNamespace)) }) - It("cleans up dual writes on kube failures", func(ctx context.Context) { + It("recovers when there are kube failures", func(ctx context.Context) { // paul creates his namespace Expect(CreateNamespace(ctx, paulClient, paulNamespace)).To(Succeed()) @@ -125,21 +125,22 @@ var _ = Describe("Proxy", func() { } else { failpoints.EnableFailPoint("panicKubeWrite", 1) } - Expect(CreateNamespace(ctx, chaniClient, chaniNamespace)).ToNot(BeNil()) + // Chani's write panics, but is retried + Expect(CreateNamespace(ctx, chaniClient, chaniNamespace)).To(Succeed()) - // paul creates chani's namespace - Expect(CreateNamespace(ctx, paulClient, chaniNamespace)).To(Succeed()) + // paul isn't able to create chanis namespace + Expect(CreateNamespace(ctx, paulClient, chaniNamespace)).ToNot(BeNil()) - // paul can get both namespaces + // paul can only get his namespace Expect(GetNamespace(ctx, paulClient, paulNamespace)).To(Succeed()) - Expect(GetNamespace(ctx, paulClient, chaniNamespace)).To(Succeed()) + Expect(GetNamespace(ctx, paulClient, chaniNamespace)).ToNot(BeNil()) - // chani can't get her namespace - this indicates the spicedb write was rolled back - // from the failed dual write above - Expect(k8serrors.IsNotFound(GetNamespace(ctx, chaniClient, chaniNamespace))).To(BeTrue()) + // chani can get her namespace - this indicates the workflow was retried and eventually succeeded + Expect(GetNamespace(ctx, chaniClient, paulNamespace)).ToNot(BeNil()) + Expect(GetNamespace(ctx, chaniClient, chaniNamespace)).To(Succeed()) }) - It("recovers dual writes when kube write succeeds but crashes", func(ctx context.Context) { + It("recovers when kube write succeeds but crashes", func(ctx context.Context) { // paul creates his namespace Expect(CreateNamespace(ctx, paulClient, paulNamespace)).To(Succeed()) @@ -179,34 +180,30 @@ var _ = Describe("Proxy", func() { Expect(k8serrors.IsNotFound(GetNamespace(ctx, chaniClient, chaniNamespace))).To(BeTrue()) }) - It("recovers dual writes when there are spicedb write failures", func(ctx context.Context) { + It("recovers when there are spicedb write failures", func(ctx context.Context) { // paul creates his namespace Expect(CreateNamespace(ctx, paulClient, paulNamespace)).To(Succeed()) - // make spicedb write crash on chani's namespace write + // make spicedb write crash on chani's namespace write, eventually succeeds failpoints.EnableFailPoint("panicWriteSpiceDB", 1) - Expect(CreateNamespace(ctx, chaniClient, chaniNamespace)).ToNot(BeNil()) + Expect(CreateNamespace(ctx, chaniClient, chaniNamespace)).To(Succeed()) - // paul creates chani's namespace so that the namespace exists - Expect(CreateNamespace(ctx, paulClient, chaniNamespace)).To(Succeed()) + // paul is unable to create chani's namespace as it's already claimed + Expect(CreateNamespace(ctx, paulClient, chaniNamespace)).ToNot(BeNil()) - // check that chani can't get her namespace, indirectly showing - // that the spicedb write was rolled back - Expect(k8serrors.IsNotFound(GetNamespace(ctx, chaniClient, chaniNamespace))).To(BeTrue()) + // Check Chani is able to get namespace + Expect(GetNamespace(ctx, chaniClient, chaniNamespace)).To(Succeed()) - // confirm the relationship doesn't exist + // confirm the relationship exists Expect(len(GetAllTuples(ctx, &v1.RelationshipFilter{ ResourceType: "namespace", OptionalResourceId: chaniNamespace, OptionalRelation: "creator", OptionalSubjectFilter: &v1.SubjectFilter{SubjectType: "user", OptionalSubjectId: "chani"}, - }))).To(BeZero()) - - // confirm paul can get the namespace - Expect(GetNamespace(ctx, paulClient, chaniNamespace)).To(Succeed()) + }))).ToNot(BeZero()) }) - It("recovers dual writes when spicedb write succeeds but crashes", func(ctx context.Context) { + It("recovers when spicedb write succeeds but crashes", func(ctx context.Context) { // paul creates his namespace Expect(CreateNamespace(ctx, paulClient, paulNamespace)).To(Succeed()) diff --git a/go.mod b/go.mod index 9e42ed9..3c049d2 100644 --- a/go.mod +++ b/go.mod @@ -7,9 +7,9 @@ require ( github.com/authzed/grpcutil v0.0.0-20230703173955-bdd0ac3f16a5 github.com/authzed/spicedb v1.24.1-0.20230821163419-e4bb3adfd50b github.com/cespare/xxhash/v2 v2.2.0 + github.com/cschleiden/go-workflows v0.16.2 github.com/dustin/go-humanize v1.0.1 github.com/google/uuid v1.3.0 - github.com/microsoft/durabletask-go v0.3.0 github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.6-0.20210604193023-d5e0c0615ace github.com/stretchr/testify v1.8.4 @@ -76,6 +76,7 @@ require ( github.com/fatih/color v1.15.0 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect + github.com/go-errors/errors v1.4.2 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-logr/zapr v1.2.4 // indirect @@ -114,20 +115,20 @@ require ( github.com/jackc/pgx-zerolog v0.0.0-20230315001418-f978528409eb // indirect github.com/jackc/pgx/v5 v5.4.2 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/jellydator/ttlcache/v3 v3.0.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/jzelinskie/cobrautil/v2 v2.0.0-20230714172849-80717639cec5 // indirect github.com/jzelinskie/stringz v0.0.2 // indirect - github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/klauspost/compress v1.16.7 // indirect github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect github.com/lthibault/jitterbug v2.0.0+incompatible // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect - github.com/marusama/semaphore/v2 v2.5.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect + github.com/mattn/go-sqlite3 v1.14.16 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/moby/sys/mountinfo v0.6.2 // indirect @@ -148,7 +149,6 @@ require ( github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.11.0 // indirect - github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rs/cors v1.9.0 // indirect github.com/rs/zerolog v1.29.1 // indirect github.com/samber/lo v1.38.1 // indirect @@ -160,6 +160,7 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/viper v1.16.0 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect + github.com/stretchr/objx v0.5.0 // indirect github.com/subosito/gotenv v1.4.2 // indirect go.etcd.io/etcd/api/v3 v3.5.9 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.9 // indirect @@ -214,16 +215,6 @@ require ( k8s.io/mount-utils v0.0.0 // indirect k8s.io/pod-security-admission v0.0.0 // indirect k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 // indirect - lukechampine.com/uint128 v1.2.0 // indirect - modernc.org/cc/v3 v3.40.0 // indirect - modernc.org/ccgo/v3 v3.16.13 // indirect - modernc.org/libc v1.22.5 // indirect - modernc.org/mathutil v1.5.0 // indirect - modernc.org/memory v1.5.0 // indirect - modernc.org/opt v0.1.3 // indirect - modernc.org/sqlite v1.22.1 // indirect - modernc.org/strutil v1.1.3 // indirect - modernc.org/token v1.0.1 // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.1.2 // indirect sigs.k8s.io/controller-runtime v0.15.0 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect diff --git a/go.sum b/go.sum index 50b632c..41ea070 100644 --- a/go.sum +++ b/go.sum @@ -129,6 +129,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creasty/defaults v1.7.0 h1:eNdqZvc5B509z18lD8yc212CAqJNvfT1Jq6L8WowdBA= github.com/creasty/defaults v1.7.0/go.mod h1:iGzKe6pbEHnpMPtfDXZEr0NVxWnPTjb1bbDy08fPzYM= +github.com/cschleiden/go-workflows v0.16.2 h1:rRKAJ/HdMFUAcZoJYJ/o7kH7Yvn9Zs3zKtSSzVQUVkk= +github.com/cschleiden/go-workflows v0.16.2/go.mod h1:uH7DoncdkBSk5U/azOSf6Yn32Or5KlSv15RoMQze4i0= github.com/dalzilio/rudd v1.1.1-0.20230806153452-9e08a6ea8170 h1:bHEN1z3EOO/IXHTQ8ZcmGoW4gTJt+mSrH2Sd458uo0E= github.com/dalzilio/rudd v1.1.1-0.20230806153452-9e08a6ea8170/go.mod h1:IxPC4Bdi3WqUwyGBMgLrWWGx67aRtUAZmOZrkIr7qaM= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -179,6 +181,8 @@ github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0X github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= +github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -353,6 +357,8 @@ github.com/jackc/pgx/v5 v5.4.2 h1:u1gmGDwbdRUZiwisBm/Ky2M14uQyUP65bG8+20nnyrg= github.com/jackc/pgx/v5 v5.4.2/go.mod h1:q6iHT8uDNXWiFNOlRqJzBTaSH3+2xCXkokxHZC5qWFY= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jellydator/ttlcache/v3 v3.0.0 h1:zmFhqrB/4sKiEiJHhtseJsNRE32IMVmJSs4++4gaQO4= +github.com/jellydator/ttlcache/v3 v3.0.0/go.mod h1:WwTaEmcXQ3MTjOm4bsZoDFiCu/hMvNWLO1w67RXz6h4= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9qUBdQ= @@ -369,8 +375,6 @@ github.com/jzelinskie/cobrautil/v2 v2.0.0-20230714172849-80717639cec5 h1:X7vo4fE github.com/jzelinskie/cobrautil/v2 v2.0.0-20230714172849-80717639cec5/go.mod h1:954benQgK9Oi403yRaIot4TgRM0dDLKrBj48K7J8NZg= github.com/jzelinskie/stringz v0.0.2 h1:OSjMEYvz8tjhovgZ/6cGcPID736ubeukr35mu6RYAmg= github.com/jzelinskie/stringz v0.0.2/go.mod h1:hHYbgxJuNLRw91CmpuFsYEOyQqpDVFg8pvEh23vy4P0= -github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= -github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= @@ -395,8 +399,6 @@ github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0V github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= -github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM= -github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ= github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= @@ -406,11 +408,10 @@ github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APP github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y= +github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= -github.com/microsoft/durabletask-go v0.3.0 h1:n3DE/SwI9sebft3vG0QcD7ioxqy15VJMzv3HOry8dJk= -github.com/microsoft/durabletask-go v0.3.0/go.mod h1:t3u0iRvIadT1y4MD5cUG0mbTOqgANT6IFcLogv7o0M0= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78= @@ -475,9 +476,6 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.11.0 h1:5EAgkfkMl659uZPbe9AS2N68a7Cc1TJbPEuGzFuRbyk= github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= -github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= -github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= -github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= @@ -1036,30 +1034,6 @@ k8s.io/pod-security-admission v0.28.0 h1:Vz8XTjMAKHQFZv9Q4GdmO59CUtelkPPDRJTy/WT k8s.io/pod-security-admission v0.28.0/go.mod h1:hABVUcP7SRALDvESOK+RYIAWc9uZ5I1eSdcUwsOYTU8= k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 h1:qY1Ad8PODbnymg2pRbkyMT/ylpTrCM8P2RJ0yroCyIk= k8s.io/utils v0.0.0-20230406110748-d93618cff8a2/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -lukechampine.com/uint128 v1.2.0 h1:mBi/5l91vocEN8otkC5bDLhi2KdCticRiwbdB0O+rjI= -lukechampine.com/uint128 v1.2.0/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk= -modernc.org/cc/v3 v3.40.0 h1:P3g79IUS/93SYhtoeaHW+kRCIrYaxJ27MFPv+7kaTOw= -modernc.org/cc/v3 v3.40.0/go.mod h1:/bTg4dnWkSXowUO6ssQKnOV0yMVxDYNIsIrzqTFDGH0= -modernc.org/ccgo/v3 v3.16.13 h1:Mkgdzl46i5F/CNR/Kj80Ri59hC8TKAhZrYSaqvkwzUw= -modernc.org/ccgo/v3 v3.16.13/go.mod h1:2Quk+5YgpImhPjv2Qsob1DnZ/4som1lJTodubIcoUkY= -modernc.org/ccorpus v1.11.6 h1:J16RXiiqiCgua6+ZvQot4yUuUy8zxgqbqEEUuGPlISk= -modernc.org/httpfs v1.0.6 h1:AAgIpFZRXuYnkjftxTAZwMIiwEqAfk8aVB2/oA6nAeM= -modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE= -modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY= -modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ= -modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= -modernc.org/memory v1.5.0 h1:N+/8c5rE6EqugZwHii4IFsaJ7MUhoWX07J5tC/iI5Ds= -modernc.org/memory v1.5.0/go.mod h1:PkUhL0Mugw21sHPeskwZW4D6VscE/GQJOnIpCnW6pSU= -modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4= -modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0= -modernc.org/sqlite v1.22.1 h1:P2+Dhp5FR1RlVRkQ3dDfCiv3Ok8XPxqpe70IjYVA9oE= -modernc.org/sqlite v1.22.1/go.mod h1:OrDj17Mggn6MhE+iPbBNf7RGKODDE9NFT0f3EwDzJqk= -modernc.org/strutil v1.1.3 h1:fNMm+oJklMGYfU9Ylcywl0CO5O6nTfaowNsh2wpPjzY= -modernc.org/strutil v1.1.3/go.mod h1:MEHNA7PdEnEwLvspRMtWTNnp2nnyvMfkimT1NKNAGbw= -modernc.org/tcl v1.15.2 h1:C4ybAYCGJw968e+Me18oW55kD/FexcHbqH2xak1ROSY= -modernc.org/token v1.0.1 h1:A3qvTqOwexpfZZeyI0FeGPDlSWX5pjZu9hF4lU+EKWg= -modernc.org/token v1.0.1/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= -modernc.org/z v1.7.3 h1:zDJf6iHjrnB+WRD88stbXokugjyc0/pB91ri1gO6LZY= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/pkg/proxy/authz.go b/pkg/proxy/authz.go index 86cb82a..e7e0c85 100644 --- a/pkg/proxy/authz.go +++ b/pkg/proxy/authz.go @@ -14,6 +14,8 @@ import ( "github.com/authzed/spicedb-kubeapi-proxy/pkg/proxy/distributedtx" v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" + "github.com/cschleiden/go-workflows/client" + "github.com/google/uuid" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -25,7 +27,7 @@ import ( "k8s.io/apiserver/pkg/endpoints/request" ) -func WithAuthorization(handler, failed http.Handler, permissionsClient v1.PermissionsServiceClient, watchClient v1.WatchServiceClient, scheduler distributedtx.TaskScheduler, lockMode *string) (http.Handler, error) { +func WithAuthorization(handler, failed http.Handler, permissionsClient v1.PermissionsServiceClient, watchClient v1.WatchServiceClient, workflowClient client.Client, lockMode *string) (http.Handler, error) { if *lockMode == "" { return nil, fmt.Errorf("lock mode is undefined") } @@ -74,7 +76,14 @@ func WithAuthorization(handler, failed http.Handler, permissionsClient v1.Permis return } - id, err := scheduler.Schedule(ctx, *lockMode, &distributedtx.CreateObjInput{ + workflow := distributedtx.PessimisticWriteToSpiceDBAndKube + if *lockMode == distributedtx.StrategyOptimisticWriteToSpiceDBAndKube { + workflow = distributedtx.OptimisticWriteToSpiceDBAndKube + } + + id, err := workflowClient.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{ + InstanceID: uuid.NewString(), + }, workflow, &distributedtx.CreateObjInput{ RequestInfo: requestInfo, UserInfo: userInfo.(*user.DefaultInfo), ObjectMeta: &pom.ObjectMeta, @@ -85,7 +94,10 @@ func WithAuthorization(handler, failed http.Handler, permissionsClient v1.Permis failed.ServeHTTP(w, req) return } - metadata, err := scheduler.WaitForCompletion(ctx, id) + + // TODO refactor deadline + + resp, err := client.GetWorkflowResult[distributedtx.KubeResp](ctx, workflowClient, id, distributedtx.DefaultWorkflowTimeout) if err != nil { fmt.Println(err) failed.ServeHTTP(w, req) @@ -102,14 +114,8 @@ func WithAuthorization(handler, failed http.Handler, permissionsClient v1.Permis })) close(allowed) - var resp distributedtx.KubeResp - if err := json.Unmarshal(metadata.SerializedOutput, &resp); err != nil { - fmt.Println(err) - failed.ServeHTTP(w, req) - return - } // this can happen if there are un-recoverable failures in the - // durable fn execution + // workflow execution if resp.Body == nil { failed.ServeHTTP(w, req) return diff --git a/pkg/proxy/distributedtx/go_workflows.go b/pkg/proxy/distributedtx/go_workflows.go new file mode 100644 index 0000000..ff1408c --- /dev/null +++ b/pkg/proxy/distributedtx/go_workflows.go @@ -0,0 +1,70 @@ +package distributedtx + +import ( + "context" + + v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" + "github.com/cschleiden/go-workflows/backend" + "github.com/cschleiden/go-workflows/backend/sqlite" + "github.com/cschleiden/go-workflows/client" + "github.com/cschleiden/go-workflows/worker" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" +) + +func SetupWithMemoryBackend(ctx context.Context, permissionClient v1.PermissionsServiceClient, kubeClient rest.Interface) (client.Client, *Worker, error) { + ctx = klog.NewContext(ctx, klog.FromContext(ctx).WithValues("backend", "sqlite-memory")) + return SetupWithBackend(ctx, permissionClient, kubeClient, sqlite.NewInMemoryBackend()) +} + +func SetupWithSQLiteBackend(ctx context.Context, permissionClient v1.PermissionsServiceClient, kubeClient rest.Interface, sqlitePath string) (client.Client, *Worker, error) { + if sqlitePath == "" { + return SetupWithMemoryBackend(ctx, permissionClient, kubeClient) + } + + ctx = klog.NewContext(ctx, klog.FromContext(ctx).WithValues("backend", "sqlite-file")) + return SetupWithBackend(ctx, permissionClient, kubeClient, sqlite.NewSqliteBackend(sqlitePath)) +} + +func SetupWithBackend(ctx context.Context, permissionClient v1.PermissionsServiceClient, kubeClient rest.Interface, backend backend.Backend) (client.Client, *Worker, error) { + klog.FromContext(ctx).Info("starting workflow engine") + txHandler := ActivityHandler{ + PermissionClient: permissionClient, + KubeClient: kubeClient, + } + + w := worker.New(backend, &worker.DefaultWorkerOptions) + + if err := w.RegisterWorkflow(PessimisticWriteToSpiceDBAndKube); err != nil { + return nil, nil, err + } + if err := w.RegisterWorkflow(OptimisticWriteToSpiceDBAndKube); err != nil { + return nil, nil, err + } + if err := w.RegisterActivity(txHandler.WriteToKube); err != nil { + return nil, nil, err + } + if err := w.RegisterActivity(txHandler.CheckKube); err != nil { + return nil, nil, err + } + if err := w.RegisterActivity(txHandler.WriteToSpiceDB); err != nil { + return nil, nil, err + } + + return client.New(backend), &Worker{worker: w}, nil +} + +type Worker struct { + worker worker.Worker + shutdownFunc func() +} + +func (w *Worker) Start(ctx context.Context) error { + ctx, w.shutdownFunc = context.WithCancel(ctx) + return w.worker.Start(ctx) +} + +func (w *Worker) Shutdown(_ context.Context) error { + w.shutdownFunc() + return w.worker.WaitForCompletion() +} diff --git a/pkg/proxy/distributedtx/tasks.go b/pkg/proxy/distributedtx/tasks.go index 1c6ed4a..ec307c9 100644 --- a/pkg/proxy/distributedtx/tasks.go +++ b/pkg/proxy/distributedtx/tasks.go @@ -2,6 +2,7 @@ package distributedtx import ( "context" + "fmt" v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -13,20 +14,6 @@ import ( "github.com/authzed/spicedb-kubeapi-proxy/pkg/failpoints" ) -type ExecutionInput interface { - GetInput(resultPtr any) error -} - -type ExecutionContext interface { - ExecutionInput - Context() context.Context -} - -type IdentifiableExecutionInput interface { - ExecutionInput - ID() string -} - type CreateObjInput struct { RequestInfo *request.RequestInfo UserInfo *user.DefaultInfo @@ -34,6 +21,28 @@ type CreateObjInput struct { Body []byte } +func (input *CreateObjInput) validate() error { + if input.UserInfo.GetName() == "" { + return fmt.Errorf("missing user info in CreateObjectInput") + } + + if input.ObjectMeta.Name == "" { + return fmt.Errorf("missing object meta in CreateObjectInput") + } + + // TODO more validation + + return nil +} + +func (input *CreateObjInput) toKubeReqInput() *KubeReqInput { + return &KubeReqInput{ + RequestInfo: input.RequestInfo, + ObjectMeta: input.ObjectMeta, + Body: input.Body, + } +} + type KubeReqInput struct { RequestInfo *request.RequestInfo ObjectMeta *metav1.ObjectMeta @@ -47,30 +56,21 @@ type KubeResp struct { Err k8serrors.StatusError } -type Handler struct { +type ActivityHandler struct { PermissionClient v1.PermissionsServiceClient KubeClient rest.Interface } // WriteToSpiceDB writes relationships to spicedb and returns any errors. -func (h *Handler) WriteToSpiceDB(ctx ExecutionContext) (any, error) { - var req v1.WriteRelationshipsRequest - if err := ctx.GetInput(&req); err != nil { - return nil, err - } +func (h *ActivityHandler) WriteToSpiceDB(ctx context.Context, input *v1.WriteRelationshipsRequest) (*v1.WriteRelationshipsResponse, error) { failpoints.FailPoint("panicWriteSpiceDB") - out, err := h.PermissionClient.WriteRelationships(ctx.Context(), &req) + out, err := h.PermissionClient.WriteRelationships(ctx, input) failpoints.FailPoint("panicSpiceDBReadResp") return out, err } -// WriteToKube peforms a Kube API Server POST, specified in a KubeReqInput propagated via the task.ActivityContext arg -func (h *Handler) WriteToKube(ctx ExecutionContext) (any, error) { - var req KubeReqInput - if err := ctx.GetInput(&req); err != nil { - return nil, err - } - +// WriteToKube performs a Kube API Server POST, specified in a KubeReqInput +func (h *ActivityHandler) WriteToKube(ctx context.Context, req *KubeReqInput) (*KubeResp, error) { failpoints.FailPoint("panicKubeWrite") kreq := h.KubeClient.Post(). @@ -79,7 +79,7 @@ func (h *Handler) WriteToKube(ctx ExecutionContext) (any, error) { if len(req.RequestInfo.Namespace) > 0 { kreq = kreq.Namespace(req.RequestInfo.Namespace) } - res := kreq.Do(ctx.Context()) + res := kreq.Do(ctx) failpoints.FailPoint("panicKubeReadResp") @@ -91,16 +91,11 @@ func (h *Handler) WriteToKube(ctx ExecutionContext) (any, error) { resp.Body = body res.StatusCode(&resp.StatusCode) res.ContentType(&resp.ContentType) - return resp, nil + return &resp, nil } -func (h *Handler) CheckKube(ctx ExecutionContext) (any, error) { - var req KubeReqInput - if err := ctx.GetInput(&req); err != nil { - return nil, err - } - +func (h *ActivityHandler) CheckKube(ctx context.Context, req *KubeReqInput) (bool, error) { // TODO: this is somewhat janky - res := h.KubeClient.Get().RequestURI(req.RequestInfo.Path + "/" + req.ObjectMeta.GetName()).Do(ctx.Context()) + res := h.KubeClient.Get().RequestURI(req.RequestInfo.Path + "/" + req.ObjectMeta.GetName()).Do(ctx) return !k8serrors.IsNotFound(res.Error()), nil } diff --git a/pkg/proxy/distributedtx/workflow_test.go b/pkg/proxy/distributedtx/workflow_test.go new file mode 100644 index 0000000..eb9a480 --- /dev/null +++ b/pkg/proxy/distributedtx/workflow_test.go @@ -0,0 +1,103 @@ +package distributedtx + +import ( + "context" + "io" + "net/http" + "strings" + "testing" + + "github.com/authzed/spicedb-kubeapi-proxy/pkg/spicedb" + + v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" + "github.com/cschleiden/go-workflows/client" + "github.com/cschleiden/go-workflows/workflow" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apiserver/pkg/authentication/user" + "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/client-go/rest/fake" +) + +func TestWorkflow(t *testing.T) { + for name, workflowFunc := range map[string]func(ctx workflow.Context, input *CreateObjInput) (*KubeResp, error){ + StrategyPessimisticWriteToSpiceDBAndKube: PessimisticWriteToSpiceDBAndKube, + StrategyOptimisticWriteToSpiceDBAndKube: OptimisticWriteToSpiceDBAndKube, + } { + t.Run(name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + srv, err := spicedb.NewServer(ctx) + require.NoError(t, err) + go func() { + require.NoError(t, srv.Run(ctx)) + }() + + dialCtx, err := srv.GRPCDialContext(ctx) + require.NoError(t, err) + + psc := v1.NewPermissionsServiceClient(dialCtx) + + kubeClient := &fake.RESTClient{ + Client: fake.CreateHTTPClient(func(request *http.Request) (*http.Response, error) { + header := http.Header{} + header.Set("Content-Type", runtime.ContentTypeJSON) + resp := &http.Response{ + Header: header, + StatusCode: http.StatusCreated, + Body: io.NopCloser(strings.NewReader("{}")), + } + return resp, nil + }), + NegotiatedSerializer: &serializer.CodecFactory{}, + } + + require.NoError(t, err) + workflowClient, worker, err := SetupWithMemoryBackend(ctx, psc, kubeClient) + require.NoError(t, err) + require.NoError(t, worker.Start(ctx)) + defer func() { + require.NoError(t, worker.Shutdown(ctx)) + }() + + id, err := workflowClient.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{ + InstanceID: uuid.NewString(), + }, workflowFunc, &CreateObjInput{ + RequestInfo: &request.RequestInfo{}, + UserInfo: &user.DefaultInfo{Name: "janedoe"}, + ObjectMeta: &metav1.ObjectMeta{Name: "my_object_meta"}, + Body: []byte("{}"), + }) + require.NoError(t, err) + + resp, err := client.GetWorkflowResult[KubeResp](ctx, workflowClient, id, DefaultWorkflowTimeout) + require.NoError(t, err) + require.NotNil(t, resp) + require.Empty(t, resp.Err, "workflow returned error: %s", resp.Err) + + cpr, err := psc.CheckPermission(ctx, &v1.CheckPermissionRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_FullyConsistent{FullyConsistent: true}, + }, + Resource: &v1.ObjectReference{ + ObjectType: "namespace", + ObjectId: "my_object_meta", + }, + Permission: "view", + Subject: &v1.SubjectReference{ + Object: &v1.ObjectReference{ + ObjectType: "user", + ObjectId: "janedoe", + }, + }, + }) + require.NoError(t, err) + require.Equal(t, v1.CheckPermissionResponse_PERMISSIONSHIP_HAS_PERMISSION, cpr.Permissionship) + }) + } + +} diff --git a/pkg/proxy/distributedtx/workflows.go b/pkg/proxy/distributedtx/workflows.go index 7d98eb3..bdc3a9f 100644 --- a/pkg/proxy/distributedtx/workflows.go +++ b/pkg/proxy/distributedtx/workflows.go @@ -1,13 +1,13 @@ package distributedtx import ( - "context" "fmt" "net/http" "time" v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" "github.com/cespare/xxhash/v2" + "github.com/cschleiden/go-workflows/workflow" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/json" @@ -22,9 +22,7 @@ const ( DefaultLockMode = StrategyPessimisticWriteToSpiceDBAndKube StrategyOptimisticWriteToSpiceDBAndKube = "OptimisticWriteToSpiceDBAndKube" StrategyPessimisticWriteToSpiceDBAndKube = "PessimisticWriteToSpiceDBAndKube" - WriteToSpiceDB = "WriteToSpiceDBActivity" - WriteToKube = "WriteToKubeActivity" - CheckKube = "CheckKubeActivity" + DefaultWorkflowTimeout = time.Second * 5 ) var KubeBackoff = wait.Backoff{ @@ -34,24 +32,6 @@ var KubeBackoff = wait.Backoff{ Steps: MaxKubeAttempts, } -type TaskExecutor interface { - Call(name string, arg any) Task -} - -type TaskScheduler interface { - Schedule(ctx context.Context, orchestrator string, opts ...any) (string, error) - WaitForCompletion(ctx context.Context, id string) (*OrchestrationResult, error) -} - -type OrchestrationResult struct { - SerializedOutput []byte - Err string -} - -type Task interface { - Await(v any) error -} - type RollbackRelationships []*v1.Relationship func NewRollbackRelationships(rels ...*v1.Relationship) *RollbackRelationships { @@ -64,7 +44,7 @@ func (r *RollbackRelationships) WithRels(relationships ...*v1.Relationship) *Rol return r } -func (r *RollbackRelationships) Cleanup(executor TaskExecutor) { +func (r *RollbackRelationships) Cleanup(ctx workflow.Context) { updates := make([]*v1.RelationshipUpdate, 0, len(*r)) for _, rel := range *r { rel := rel @@ -73,12 +53,16 @@ func (r *RollbackRelationships) Cleanup(executor TaskExecutor) { Relationship: rel, }) } + // TODO: Should this be a separate workflow? + var ah ActivityHandler for { - var delResp v1.WriteRelationshipsResponse - if err := executor.Call(WriteToSpiceDB, &v1.WriteRelationshipsRequest{ - Updates: updates, - }).Await(&delResp); err != nil { + f := workflow.ExecuteActivity[*v1.WriteRelationshipsResponse](ctx, + workflow.DefaultActivityOptions, + ah.WriteToSpiceDB, + &v1.WriteRelationshipsRequest{Updates: updates}) + + if _, err := f.Get(ctx); err != nil { fmt.Println("error deleting lock tuple", err) continue } @@ -90,64 +74,49 @@ func (r *RollbackRelationships) Cleanup(executor TaskExecutor) { // PessimisticWriteToSpiceDBAndKube ensures that a write exists in both SpiceDB // and kube, or neither, using locks. It prevents multiple users from writing // the same object/fields at the same time -func PessimisticWriteToSpiceDBAndKube(ctx IdentifiableExecutionInput, executor TaskExecutor) (any, error) { - var input CreateObjInput - if err := ctx.GetInput(&input); err != nil { - return nil, err - } - - if err := validateInput(input); err != nil { +func PessimisticWriteToSpiceDBAndKube(ctx workflow.Context, input *CreateObjInput) (*KubeResp, error) { + if err := input.validate(); err != nil { return nil, fmt.Errorf("invalid input to PessimisticWriteToSpiceDBAndKube: %w", err) } + instance := workflow.WorkflowInstance(ctx) // this is hardcoded for namespaces for now; should be configurable and // come from the workflow input - spiceDBRelationship := SpiceDBNamespaceRel(input) - clusterRelationship := SpiceDBClusterRel(input) - lockTuple := LockRel(input, ctx.ID()) + resourceRel := SpiceDBNamespaceRel(input) + clusterRel := SpiceDBClusterRel(input) + resourceLockRel := ResourceLockRel(input, instance.InstanceID) // tuples to remove when the workflow is complete. // in some cases we will roll back the input, in all cases we remove // the lock when complete. - rollback := NewRollbackRelationships(lockTuple) - - var resp v1.WriteRelationshipsResponse - if err := executor.Call(WriteToSpiceDB, &v1.WriteRelationshipsRequest{ - OptionalPreconditions: []*v1.Precondition{{ - Operation: v1.Precondition_OPERATION_MUST_NOT_MATCH, - Filter: &v1.RelationshipFilter{ - ResourceType: lockResourceType, - OptionalResourceId: lockTuple.Resource.ObjectId, - OptionalRelation: lockRelationName, - OptionalSubjectFilter: &v1.SubjectFilter{SubjectType: workflowResourceType}, - }, - }, { - Operation: v1.Precondition_OPERATION_MUST_NOT_MATCH, - Filter: &v1.RelationshipFilter{ - ResourceType: clusterRelationship.Resource.ObjectType, - OptionalResourceId: clusterRelationship.Resource.ObjectId, - OptionalRelation: clusterRelationship.Relation, - OptionalSubjectFilter: &v1.SubjectFilter{ - SubjectType: clusterRelationship.Subject.Object.ObjectType, - OptionalSubjectId: clusterRelationship.Subject.Object.ObjectType, - }, - }, - }}, + rollback := NewRollbackRelationships(resourceLockRel) + + arg := &v1.WriteRelationshipsRequest{ + OptionalPreconditions: []*v1.Precondition{ + resourceLockDoesNotExist(resourceLockRel), + clusterLockDoesNotExist(clusterRel), + }, Updates: []*v1.RelationshipUpdate{{ Operation: v1.RelationshipUpdate_OPERATION_TOUCH, - Relationship: spiceDBRelationship, + Relationship: resourceRel, }, { Operation: v1.RelationshipUpdate_OPERATION_TOUCH, - Relationship: lockTuple, + Relationship: resourceLockRel, }, { Operation: v1.RelationshipUpdate_OPERATION_TOUCH, - Relationship: clusterRelationship, - }}, - }).Await(&resp); err != nil { + Relationship: clusterRel, + }}} + + var ah ActivityHandler + _, err := workflow.ExecuteActivity[*v1.WriteRelationshipsResponse](ctx, + workflow.DefaultActivityOptions, + ah.WriteToSpiceDB, + arg).Get(ctx) + if err != nil { // request failed for some reason fmt.Println("spicedb write failed", err) - rollback.WithRels(spiceDBRelationship, clusterRelationship).Cleanup(executor) + rollback.WithRels(resourceRel, clusterRel).Cleanup(ctx) // if the spicedb write fails, report it as a kube conflict error // we return this for any error, not just lock conflicts, so that the @@ -165,14 +134,13 @@ func PessimisticWriteToSpiceDBAndKube(ctx IdentifiableExecutionInput, executor T } for i := 0; i <= MaxKubeAttempts; i++ { // Attempt to write to kube - var out KubeResp - if err := executor.Call(WriteToKube, &KubeReqInput{ - RequestInfo: input.RequestInfo, - ObjectMeta: input.ObjectMeta, - Body: input.Body, - }).Await(&out); err != nil { + out, err := workflow.ExecuteActivity[*KubeResp](ctx, + workflow.DefaultActivityOptions, + ah.WriteToKube, + input.toKubeReqInput()).Get(ctx) + if err != nil { // didn't get a response from kube, try again - fmt.Println("kube write failed", err, out) + fmt.Println("kube write failed", err) time.Sleep(backoff.Step()) continue } @@ -184,40 +152,24 @@ func PessimisticWriteToSpiceDBAndKube(ctx IdentifiableExecutionInput, executor T } if out.StatusCode == http.StatusConflict || out.StatusCode == http.StatusCreated { - rollback.Cleanup(executor) + rollback.Cleanup(ctx) return out, nil } // some other status code is some other type of error, remove // the original tuple and the lock tuple - rollback.WithRels(spiceDBRelationship, clusterRelationship).Cleanup(executor) + rollback.WithRels(resourceRel, clusterRel).Cleanup(ctx) return out, nil } - rollback.WithRels(spiceDBRelationship, clusterRelationship).Cleanup(executor) - return nil, fmt.Errorf("failed to communicate with kubernetes after %d attempts", MaxKubeAttempts) -} - -func validateInput(input CreateObjInput) error { - if input.UserInfo.GetName() == "" { - return fmt.Errorf("missing user info in CreateObjectInput") - } - if input.ObjectMeta.Name == "" { - return fmt.Errorf("missing object meta in CreateObjectInput") - } - - return nil + rollback.WithRels(resourceRel, clusterRel).Cleanup(ctx) + return nil, fmt.Errorf("failed to communicate with kubernetes after %d attempts", MaxKubeAttempts) } // OptimisticWriteToSpiceDBAndKube ensures that a write exists in both SpiceDB and kube, // or neither. It attempts to perform the writes and rolls back if errors are // encountered, leaving the user to retry on write conflicts. -func OptimisticWriteToSpiceDBAndKube(ctx IdentifiableExecutionInput, executor TaskExecutor) (any, error) { - var input CreateObjInput - if err := ctx.GetInput(&input); err != nil { - return nil, err - } - +func OptimisticWriteToSpiceDBAndKube(ctx workflow.Context, input *CreateObjInput) (*KubeResp, error) { // TODO: this could optionally use dry-run to preflight the kube request // this is hardcoded for namespaces for now; should be configurable and @@ -226,8 +178,7 @@ func OptimisticWriteToSpiceDBAndKube(ctx IdentifiableExecutionInput, executor Ta clusterRelationship := SpiceDBClusterRel(input) rollback := NewRollbackRelationships(spiceDBRelationship, clusterRelationship) - var resp v1.WriteRelationshipsResponse - if err := executor.Call(WriteToSpiceDB, &v1.WriteRelationshipsRequest{ + arg := &v1.WriteRelationshipsRequest{ Updates: []*v1.RelationshipUpdate{{ Operation: v1.RelationshipUpdate_OPERATION_CREATE, Relationship: spiceDBRelationship, @@ -235,44 +186,49 @@ func OptimisticWriteToSpiceDBAndKube(ctx IdentifiableExecutionInput, executor Ta Operation: v1.RelationshipUpdate_OPERATION_CREATE, Relationship: clusterRelationship, }}, - }).Await(&resp); err != nil { - rollback.Cleanup(executor) - fmt.Println("WRITE ERR", err) + } + var ah ActivityHandler + _, err := workflow.ExecuteActivity[*v1.WriteRelationshipsResponse](ctx, + workflow.DefaultActivityOptions, + ah.WriteToSpiceDB, + arg).Get(ctx) + if err != nil { + rollback.Cleanup(ctx) + fmt.Println("SpiceDB WRITE ERR", err) // report spicedb write errors as conflicts return KubeConflict(err, input), nil } - var out KubeResp - if err := executor.Call(WriteToKube, &KubeReqInput{ - RequestInfo: input.RequestInfo, - ObjectMeta: input.ObjectMeta, - Body: input.Body, - }).Await(&out); err != nil { + out, err := workflow.ExecuteActivity[*KubeResp](ctx, + workflow.DefaultActivityOptions, + ah.WriteToKube, + input.toKubeReqInput()).Get(ctx) + if err != nil { // if there's an error, might need to roll back the spicedb write // check if object exists - we might have failed the write task but // succeeded in writing to kube - var exists bool - if err := executor.Call(CheckKube, &KubeReqInput{ - RequestInfo: input.RequestInfo, - ObjectMeta: input.ObjectMeta, - Body: input.Body, - }).Await(&exists); err != nil { + exists, err := workflow.ExecuteActivity[bool](ctx, + workflow.DefaultActivityOptions, + ah.CheckKube, + input.toKubeReqInput()).Get(ctx) + if err != nil { return nil, err } // if the object doesn't exist, clean up the spicedb write if !exists { - rollback.Cleanup(executor) + rollback.Cleanup(ctx) return nil, err } } + return out, nil } -// LockRel generates a relationship representing a worfklow's lock over a +// ResourceLockRel generates a relationship representing a worfklow's lock over a // specific resource in kube. -func LockRel(input CreateObjInput, id string) *v1.Relationship { +func ResourceLockRel(input *CreateObjInput, workflowID string) *v1.Relationship { lockKey := input.RequestInfo.Path + "/" + input.ObjectMeta.GetName() + "/" + input.RequestInfo.Verb lockHash := fmt.Sprintf("%x", xxhash.Sum64String(lockKey)) return &v1.Relationship{ @@ -284,7 +240,7 @@ func LockRel(input CreateObjInput, id string) *v1.Relationship { Subject: &v1.SubjectReference{ Object: &v1.ObjectReference{ ObjectType: workflowResourceType, - ObjectId: id, + ObjectId: workflowID, }, }, } @@ -293,7 +249,7 @@ func LockRel(input CreateObjInput, id string) *v1.Relationship { // SpiceDBNamespaceRel returns a tuple to write when creating a namespace. // This is hardcoded for namespaces for now; should be configurable and // come from the workflow input -func SpiceDBNamespaceRel(input CreateObjInput) *v1.Relationship { +func SpiceDBNamespaceRel(input *CreateObjInput) *v1.Relationship { return &v1.Relationship{ Resource: &v1.ObjectReference{ ObjectType: "namespace", @@ -312,7 +268,7 @@ func SpiceDBNamespaceRel(input CreateObjInput) *v1.Relationship { // SpiceDBClusterRel returns a tuple to write when creating a namespace. // This is hardcoded for namespaces for now; should be configurable and // come from the workflow input -func SpiceDBClusterRel(input CreateObjInput) *v1.Relationship { +func SpiceDBClusterRel(input *CreateObjInput) *v1.Relationship { return &v1.Relationship{ Resource: &v1.ObjectReference{ ObjectType: "namespace", @@ -330,7 +286,7 @@ func SpiceDBClusterRel(input CreateObjInput) *v1.Relationship { // KubeConflict wraps an error and turns it into a standard kube conflict // response. -func KubeConflict(err error, input CreateObjInput) KubeResp { +func KubeConflict(err error, input *CreateObjInput) *KubeResp { var out KubeResp statusError := k8serrors.NewConflict(schema.GroupResource{ Group: input.RequestInfo.APIGroup, @@ -339,5 +295,32 @@ func KubeConflict(err error, input CreateObjInput) KubeResp { out.StatusCode = http.StatusConflict out.Err = *statusError out.Body, _ = json.Marshal(statusError) - return out + return &out +} + +func resourceLockDoesNotExist(lockRel *v1.Relationship) *v1.Precondition { + return &v1.Precondition{ + Operation: v1.Precondition_OPERATION_MUST_NOT_MATCH, + Filter: &v1.RelationshipFilter{ + ResourceType: lockResourceType, + OptionalResourceId: lockRel.Resource.ObjectId, + OptionalRelation: lockRelationName, + OptionalSubjectFilter: &v1.SubjectFilter{SubjectType: workflowResourceType}, + }, + } +} + +func clusterLockDoesNotExist(clusterRelationship *v1.Relationship) *v1.Precondition { + return &v1.Precondition{ + Operation: v1.Precondition_OPERATION_MUST_NOT_MATCH, + Filter: &v1.RelationshipFilter{ + ResourceType: clusterRelationship.Resource.ObjectType, + OptionalResourceId: clusterRelationship.Resource.ObjectId, + OptionalRelation: clusterRelationship.Relation, + OptionalSubjectFilter: &v1.SubjectFilter{ + SubjectType: clusterRelationship.Subject.Object.ObjectType, + OptionalSubjectId: clusterRelationship.Subject.Object.ObjectType, + }, + }, + } } diff --git a/pkg/proxy/durabletask/durabletask.go b/pkg/proxy/durabletask/durabletask.go deleted file mode 100644 index 0792710..0000000 --- a/pkg/proxy/durabletask/durabletask.go +++ /dev/null @@ -1,131 +0,0 @@ -package durabletask - -import ( - "context" - - "github.com/authzed/spicedb-kubeapi-proxy/pkg/proxy/distributedtx" - - v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" - "github.com/microsoft/durabletask-go/api" - "github.com/microsoft/durabletask-go/backend" - "github.com/microsoft/durabletask-go/backend/sqlite" - "github.com/microsoft/durabletask-go/task" - "k8s.io/client-go/rest" -) - -const MemorySQLite = "" - -type TaskWorker interface { - Start(context.Context) error - Shutdown(context.Context) error -} - -func Setup(permissionClient v1.PermissionsServiceClient, kubeClient rest.Interface, sqlitePath string) (distributedtx.TaskScheduler, TaskWorker, error) { - h := TaskHandler{distributedtx.Handler{PermissionClient: permissionClient, KubeClient: kubeClient}} - - // durabletask - stores planned dual writes in a sqlite db - logger := backend.DefaultLogger() - r := task.NewTaskRegistry() - if err := r.AddOrchestratorN(distributedtx.StrategyPessimisticWriteToSpiceDBAndKube, PessimisticWriteToSpiceDBAndKube); err != nil { - return nil, nil, err - } - if err := r.AddOrchestratorN(distributedtx.StrategyOptimisticWriteToSpiceDBAndKube, OptimisticWriteToSpiceDBAndKube); err != nil { - return nil, nil, err - } - if err := r.AddActivityN(distributedtx.WriteToSpiceDB, h.WriteToSpiceDB); err != nil { - return nil, nil, err - } - if err := r.AddActivityN(distributedtx.WriteToKube, h.WriteToKube); err != nil { - return nil, nil, err - } - if err := r.AddActivityN(distributedtx.CheckKube, h.CheckKube); err != nil { - return nil, nil, err - } - - // note: can use the in-memory sqlite provider by specifying "" - be := sqlite.NewSqliteBackend(sqlite.NewSqliteOptions(sqlitePath), logger) - executor := task.NewTaskExecutor(r) - orchestrationWorker := backend.NewOrchestrationWorker(be, executor, logger) - activityWorker := backend.NewActivityTaskWorker(be, executor, logger) - worker := backend.NewTaskHubWorker(be, orchestrationWorker, activityWorker, logger) - - taskScheduler := TaskScheduler{ - taskHubClient: backend.NewTaskHubClient(be), - } - return taskScheduler, worker, nil -} - -func PessimisticWriteToSpiceDBAndKube(ctx *task.OrchestrationContext) (any, error) { - ec := ExecutionContext{ctx: ctx} - return distributedtx.PessimisticWriteToSpiceDBAndKube(ec, ec) -} - -func OptimisticWriteToSpiceDBAndKube(ctx *task.OrchestrationContext) (any, error) { - ec := ExecutionContext{ctx: ctx} - return distributedtx.OptimisticWriteToSpiceDBAndKube(ec, ec) -} - -type ExecutionContext struct { - ctx *task.OrchestrationContext -} - -func (e ExecutionContext) Call(name string, arg any) distributedtx.Task { - return e.ctx.CallActivity(name, task.WithActivityInput(arg)) -} - -func (e ExecutionContext) GetInput(resultPtr any) error { - return e.ctx.GetInput(resultPtr) -} - -func (e ExecutionContext) ID() string { - return string(e.ctx.ID) -} - -type TaskScheduler struct { - taskHubClient backend.TaskHubClient -} - -func (t TaskScheduler) Schedule(ctx context.Context, orchestrator string, opts ...any) (string, error) { - var orchestrationOpts []api.NewOrchestrationOptions - for _, opt := range opts { - orchestrationOpts = append(orchestrationOpts, api.WithInput(opt)) - } - - instanceID, err := t.taskHubClient.ScheduleNewOrchestration(ctx, orchestrator, orchestrationOpts...) - if err != nil { - return string(instanceID), err - } - - return string(instanceID), nil -} - -func (t TaskScheduler) WaitForCompletion(ctx context.Context, id string) (*distributedtx.OrchestrationResult, error) { - metadata, err := t.taskHubClient.WaitForOrchestrationCompletion(ctx, api.InstanceID(id)) - if err != nil { - return nil, err - } - var errMsg string - if metadata.FailureDetails != nil { - errMsg = metadata.FailureDetails.ErrorMessage - } - return &distributedtx.OrchestrationResult{ - SerializedOutput: []byte(metadata.SerializedOutput), - Err: errMsg, - }, nil -} - -type TaskHandler struct { - distributedtx.Handler -} - -func (th TaskHandler) WriteToSpiceDB(ctx task.ActivityContext) (any, error) { - return th.Handler.WriteToSpiceDB(ctx) -} - -func (th TaskHandler) WriteToKube(ctx task.ActivityContext) (any, error) { - return th.Handler.WriteToKube(ctx) -} - -func (th TaskHandler) CheckKube(ctx task.ActivityContext) (any, error) { - return th.Handler.CheckKube(ctx) -} diff --git a/pkg/proxy/durabletask/durabletask_test.go b/pkg/proxy/durabletask/durabletask_test.go deleted file mode 100644 index 9010a33..0000000 --- a/pkg/proxy/durabletask/durabletask_test.go +++ /dev/null @@ -1,89 +0,0 @@ -package durabletask - -import ( - "context" - "io" - "net/http" - "strings" - "testing" - - "github.com/authzed/spicedb-kubeapi-proxy/pkg/proxy/distributedtx" - "github.com/authzed/spicedb-kubeapi-proxy/pkg/spicedb" - - v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" - "github.com/stretchr/testify/require" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/serializer" - "k8s.io/apiserver/pkg/authentication/user" - "k8s.io/apiserver/pkg/endpoints/request" - "k8s.io/client-go/rest/fake" -) - -func TestDurableTask(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - srv, err := spicedb.NewServer(ctx) - require.NoError(t, err) - go func() { - require.NoError(t, srv.Run(ctx)) - }() - - dialCtx, err := srv.GRPCDialContext(ctx) - require.NoError(t, err) - - psc := v1.NewPermissionsServiceClient(dialCtx) - - kubeClient := &fake.RESTClient{ - Client: fake.CreateHTTPClient(func(request *http.Request) (*http.Response, error) { - header := http.Header{} - header.Set("Content-Type", runtime.ContentTypeJSON) - resp := &http.Response{ - Header: header, - StatusCode: http.StatusCreated, - Body: io.NopCloser(strings.NewReader("{}")), - } - return resp, nil - }), - NegotiatedSerializer: &serializer.CodecFactory{}, - } - - scheduler, worker, err := Setup(psc, kubeClient, MemorySQLite) - require.NoError(t, err) - require.NoError(t, worker.Start(ctx)) - defer func() { - require.NoError(t, worker.Shutdown(ctx)) - }() - - id, err := scheduler.Schedule(ctx, distributedtx.DefaultLockMode, &distributedtx.CreateObjInput{ - RequestInfo: &request.RequestInfo{}, - UserInfo: &user.DefaultInfo{Name: "janedoe"}, - ObjectMeta: &metav1.ObjectMeta{Name: "my_object_meta"}, - Body: []byte("{}"), - }) - require.NoError(t, err) - - metadata, err := scheduler.WaitForCompletion(ctx, id) - require.NoError(t, err) - require.Empty(t, metadata.Err, "workflow returned error: %s", metadata.Err) - - resp, err := psc.CheckPermission(ctx, &v1.CheckPermissionRequest{ - Consistency: &v1.Consistency{ - Requirement: &v1.Consistency_FullyConsistent{FullyConsistent: true}, - }, - Resource: &v1.ObjectReference{ - ObjectType: "namespace", - ObjectId: "my_object_meta", - }, - Permission: "view", - Subject: &v1.SubjectReference{ - Object: &v1.ObjectReference{ - ObjectType: "user", - ObjectId: "janedoe", - }, - }, - }) - require.NoError(t, err) - require.Equal(t, v1.CheckPermissionResponse_PERMISSIONSHIP_HAS_PERMISSION, resp.Permissionship) -} diff --git a/pkg/proxy/options.go b/pkg/proxy/options.go index bc76f2b..29e870e 100644 --- a/pkg/proxy/options.go +++ b/pkg/proxy/options.go @@ -29,8 +29,9 @@ import ( ) const ( - defaultDurableTaskDatabasePath = "/tmp/dtx.sqlite" - EmbeddedSpiceDBEndpoint = "embedded://" + defaultWorkflowDatabasePath = "/tmp/dtx.sqlite" + EmbeddedSpiceDBEndpoint = "embedded://" + defaultDialerTimeout = 5 * time.Second ) type Options struct { @@ -55,8 +56,8 @@ type Options struct { skipVerifyCA bool token string - DurableTaskDatabasePath string - LockMode string + WorkflowDatabasePath string + LockMode string } func NewOptions() *Options { @@ -75,7 +76,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { o.SecureServing.AddFlags(fs) o.Authentication.AddFlags(fs) logsv1.AddFlags(o.Logs, fs) - fs.StringVar(&o.DurableTaskDatabasePath, "durabletask-database-path", defaultDurableTaskDatabasePath, "Path for the file representing the SQLite database used for the durable task engine.") + fs.StringVar(&o.WorkflowDatabasePath, "workflow-database-path", defaultWorkflowDatabasePath, "Path for the file representing the SQLite database used for the workflow engine.") fs.StringVar(&o.BackendKubeconfigPath, "backend-kubeconfig", o.BackendKubeconfigPath, "The path to the kubeconfig to proxy connections to. It should authenticate the user with cluster-admin permission.") fs.StringVar(&o.SpiceDBEndpoint, "spicedb-endpoint", "localhost:50051", "Defines the endpoint endpoint to the SpiceDB authorizing proxy operations. if embedded:// is specified, an in memory ephemeral instance created.") fs.BoolVar(&o.insecure, "spicedb-insecure", false, "If set to true uses the insecure transport configuration for gRPC. Set to false by default.") @@ -165,7 +166,7 @@ func (o *Options) Complete(ctx context.Context) error { } opts = append(opts, grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoff.DefaultConfig})) - timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + timeoutCtx, cancel := context.WithTimeout(ctx, defaultDialerTimeout) defer cancel() conn, err = grpc.DialContext(timeoutCtx, o.SpiceDBEndpoint, opts...) if err != nil { diff --git a/pkg/proxy/server.go b/pkg/proxy/server.go index c35c78d..9372cf7 100644 --- a/pkg/proxy/server.go +++ b/pkg/proxy/server.go @@ -11,7 +11,6 @@ import ( "time" "github.com/authzed/spicedb-kubeapi-proxy/pkg/proxy/distributedtx" - "github.com/authzed/spicedb-kubeapi-proxy/pkg/proxy/durabletask" v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -31,10 +30,10 @@ import ( ) type Server struct { - opts Options - Handler http.Handler - TaskWorker durabletask.TaskWorker - KubeClient *kubernetes.Clientset + opts Options + Handler http.Handler + WorkflowWorker *distributedtx.Worker + KubeClient *kubernetes.Clientset // LockMode references the name of the workflow to run for dual writes // This is very temporary, and should be replaced with per-request @@ -42,15 +41,12 @@ type Server struct { LockMode string } -var defaultDistributedTransactionProvider = durabletask.Setup - func NewServer(ctx context.Context, o Options) (*Server, error) { s := &Server{ opts: o, LockMode: distributedtx.DefaultLockMode, } - klog.FromContext(ctx).Info("starting durable task engine") restConfig, err := clientcmd.NewDefaultClientConfig(*s.opts.BackendConfig, nil).ClientConfig() if err != nil { return nil, err @@ -112,13 +108,16 @@ func NewServer(ctx context.Context, o Options) (*Server, error) { codecs := serializer.NewCodecFactory(scheme) failHandler := genericapifilters.Unauthorized(codecs) - scheduler, worker, err := defaultDistributedTransactionProvider(s.opts.PermissionsClient, s.KubeClient.RESTClient(), durabletask.MemorySQLite) + workflowClient, worker, err := distributedtx.SetupWithSQLiteBackend(ctx, + s.opts.PermissionsClient, + s.KubeClient.RESTClient(), + o.WorkflowDatabasePath) if err != nil { return nil, fmt.Errorf("failed to initialize distributed transaction handling: %w", err) } - s.TaskWorker = worker + s.WorkflowWorker = worker - handler, err := WithAuthorization(clusterProxy, failHandler, o.PermissionsClient, o.WatchClient, scheduler, &s.LockMode) + handler, err := WithAuthorization(clusterProxy, failHandler, o.PermissionsClient, o.WatchClient, workflowClient, &s.LockMode) if err != nil { return nil, fmt.Errorf("unable to create authorization handler: %w", err) } @@ -156,12 +155,12 @@ func (s *Server) Run(ctx context.Context) error { } go func() { - if err := s.TaskWorker.Start(ctx); err != nil { - klog.FromContext(ctx).Error(err, "failed to run durable task worker") + if err := s.WorkflowWorker.Start(ctx); err != nil { + klog.FromContext(ctx).Error(err, "failed to run workflow worker") cancel() return } - klog.FromContext(ctx).Info("task hub worker started") + klog.FromContext(ctx).Info("workflow worker started") }() doneCh, _, err := s.opts.ServingInfo.Serve(s.Handler, time.Second*60, ctx.Done()) if err != nil { @@ -172,7 +171,7 @@ func (s *Server) Run(ctx context.Context) error { ctx, cancel = context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() - if err := s.TaskWorker.Shutdown(ctx); err != nil { + if err := s.WorkflowWorker.Shutdown(ctx); err != nil { return err } return nil