From 7e7907ef1272d51811cab4cfb6fce71cf730505c Mon Sep 17 00:00:00 2001 From: anrs Date: Fri, 18 Dec 2020 14:01:15 +0800 Subject: [PATCH] feat: core WAL (#303) Co-authored-by: anrs --- go.mod | 1 + go.sum | 29 +----- types/config.go | 3 + types/errors.go | 3 + wal/hydro.go | 137 +++++++++++++++++++++++++ wal/hydro_event.go | 55 ++++++++++ wal/hydro_test.go | 226 +++++++++++++++++++++++++++++++++++++++++ wal/kv/kv.go | 44 ++++++++ wal/kv/lithium.go | 173 +++++++++++++++++++++++++++++++ wal/kv/lithium_test.go | 81 +++++++++++++++ wal/kv/mocked.go | 134 ++++++++++++++++++++++++ wal/mocks/WAL.go | 48 +++++++++ wal/wal.go | 91 +++++++++++++++++ wal/wal_test.go | 67 ++++++++++++ 14 files changed, 1065 insertions(+), 27 deletions(-) create mode 100644 wal/hydro.go create mode 100644 wal/hydro_event.go create mode 100644 wal/hydro_test.go create mode 100644 wal/kv/kv.go create mode 100644 wal/kv/lithium.go create mode 100644 wal/kv/lithium_test.go create mode 100644 wal/kv/mocked.go create mode 100644 wal/mocks/WAL.go create mode 100644 wal/wal.go create mode 100644 wal/wal_test.go diff --git a/go.mod b/go.mod index fdf7f2023..e4e713561 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/CMGS/statsd v0.0.0-20160223095033-48c421b3c1ab github.com/Microsoft/hcsshim v0.8.11 // indirect github.com/alexcesaro/statsd v2.0.0+incompatible // indirect + github.com/boltdb/bolt v1.3.1 github.com/cenkalti/backoff/v4 v4.0.2 github.com/containerd/containerd v1.4.3 // indirect github.com/containerd/continuity v0.0.0-20200710164510-efbc4488d8fe // indirect diff --git a/go.sum b/go.sum index e9df1aff6..71d721d32 100644 --- a/go.sum +++ b/go.sum @@ -27,18 +27,14 @@ github.com/Microsoft/hcsshim v0.8.11 h1:qs8+XI1mFA1H/zhXT9qVG/lcJO18p1yCsICIrCjV github.com/Microsoft/hcsshim v0.8.11/go.mod h1:NtVKoYxQuTLx6gEq0L96c9Ju4JbRJ4nY2ow3VK6a9Lg= github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0= github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY= -github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= -github.com/alexcesaro/statsd v2.0.0+incompatible h1:HG17k1Qk8V1F4UOoq6tx+IUoAbOcI5PHzzEUGeDD72w= github.com/alexcesaro/statsd v2.0.0+incompatible/go.mod h1:vNepIbQAiyLe1j480173M6NYYaAsGwEcvuDTU3OCUGY= -github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 h1:kFOfPq6dUM1hTo4JG6LR5AXSUEsOjtdm0kw0FtQtMJA= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= -github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= @@ -46,6 +42,8 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= +github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/cenkalti/backoff/v4 v4.0.2 h1:JIufpQLbh4DkbQoii76ItQIUFzevQSqOLZca4eamEDs= github.com/cenkalti/backoff/v4 v4.0.2/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -54,7 +52,6 @@ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/cilium/ebpf v0.0.0-20200110133405-4032b1d8aae3/go.mod h1:MA5e5Lr8slmEg9bt0VpxxWqJlO4iwu3FBdHUzV7wQVg= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= -github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa h1:OaNxuTZr7kxeODyLWsRMC+OD03aFUH+mW6r2d+MWa5Y= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0/go.mod h1:4Zcjuz89kmFXt9morQgcfYZAYZ5n8WHjt81YYWIwtTM= github.com/containerd/cgroups v0.0.0-20200531161412-0dbf7f05ba59 h1:qWj4qVYZ95vLWwqyNJCQg7rDsG5wPdze0UaPolH7DUk= @@ -65,8 +62,6 @@ github.com/containerd/containerd v1.4.3 h1:ijQT13JedHSHrQGWFcGEwzcNKrAGIiZ+jSD5Q github.com/containerd/containerd v1.4.3/go.mod h1:bC6axHOhabU15QhwfG7w5PipXdVtMXFTttgp+kVtyUA= github.com/containerd/continuity v0.0.0-20190426062206-aaeac12a7ffc/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y= github.com/containerd/continuity v0.0.0-20200710164510-efbc4488d8fe h1:PEmIrUvwG9Yyv+0WKZqjXfSFDeZjs/q15g0m08BYS9k= -github.com/containerd/continuity v0.0.0-20200710164510-efbc4488d8fe h1:PEmIrUvwG9Yyv+0WKZqjXfSFDeZjs/q15g0m08BYS9k= -github.com/containerd/continuity v0.0.0-20200710164510-efbc4488d8fe/go.mod h1:cECdGN1O8G9bgKTlLhuPJimka6Xb/Gg7vYzCTNVxhvo= github.com/containerd/continuity v0.0.0-20200710164510-efbc4488d8fe/go.mod h1:cECdGN1O8G9bgKTlLhuPJimka6Xb/Gg7vYzCTNVxhvo= github.com/containerd/fifo v0.0.0-20190226154929-a9fb20d87448/go.mod h1:ODA38xgv3Kuk8dQz2ZQXpnv/UZZUHUCL7pnLehbXgQI= github.com/containerd/go-runc v0.0.0-20180907222934-5a6d9f37cfa3/go.mod h1:IV7qH3hrUgRmyYrtgEeGWJfWbgcHL9CSRruz2Vqcph0= @@ -84,13 +79,11 @@ github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf h1:CAKfRE2YtTUIjjh1bkBt github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= -github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0 h1:EoUDS0afbrsXAZ9YQ9jdu/mZ2sXgT1/2yyNng4PGlyM= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw= github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v0.0.0-20161028175848-04cdfd42973b/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -110,9 +103,7 @@ github.com/docker/go-metrics v0.0.1 h1:AgB/0SvBxihN0X8OR4SjsblXkbMvalQ8cjmtKQ2rQ github.com/docker/go-metrics v0.0.1/go.mod h1:cG1hvH2utMXtqgqqYE9plW6lDxS3/5ayHzueweSI3Vw= github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= -github.com/docker/libtrust v0.0.0-20160708172513-aabc10ec26b7 h1:UhxFibDNY/bfvqU5CAUmr9zpesgbU6SWc8/B4mflAE4= github.com/docker/libtrust v0.0.0-20160708172513-aabc10ec26b7/go.mod h1:cyGadeNEkKy96OOhEzfZl+yxihPEzKnqJwvfuSUqbZE= -github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4 h1:qk/FSDDxo05wdJH28W+p5yivv7LuLYLRXPPD8KQCtZs= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= @@ -127,7 +118,6 @@ github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHj github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= -github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 h1:BHsljHzVlRcyQhjrss6TZTdY2VfCqZPbv5k3iBFa2ZQ= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gavv/httpexpect v2.0.0+incompatible/go.mod h1:x+9tiU1YnrOvnB725RkpoLv1M62hOWzwo5OXotisrKc= @@ -136,16 +126,13 @@ github.com/getsentry/sentry-go v0.9.0/go.mod h1:kELm/9iCblqUYh+ZRML7PNdCvEuw24wB github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s= github.com/gin-gonic/gin v1.4.0/go.mod h1:OW2EZn3DO8Ln9oIKOvM++LBO+5UPHJJDH72/q/3rZdM= -github.com/gliderlabs/ssh v0.2.2 h1:6zsha5zo/TWhRhwqCD3+EarCAgZ2yN28ipRnGPnwkI0= github.com/gliderlabs/ssh v0.2.2/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98= -github.com/go-errors/errors v1.0.1 h1:LUHzmkK3GUKUrL/1gfBUxAHzcev3apQlezX/+O7ma6w= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= github.com/go-git/gcfg v1.5.0 h1:Q5ViNfGF8zFgyJWPqYwA7qGFoMTEiBmdlkcfRmpIMa4= github.com/go-git/gcfg v1.5.0/go.mod h1:5m20vg6GwYabIxaOonVkTdrILxQMpEShl1xiMF4ua+E= github.com/go-git/go-billy/v5 v5.0.0 h1:7NQHvd9FVid8VL4qVUMm8XifBK+2xCoZ2lSk0agRrHM= github.com/go-git/go-billy/v5 v5.0.0/go.mod h1:pmpqyWchKfYfrkb/UVH4otLvyi/5gJlGI4Hb3ZqZ3W0= -github.com/go-git/go-git-fixtures/v4 v4.0.1 h1:q+IFMfLx200Q3scvt2hN79JsEzy4AmBTp/pqnefH+Bc= github.com/go-git/go-git-fixtures/v4 v4.0.1/go.mod h1:m+ICp2rF3jDhFgEZ/8yziagdT1C+ZpZcrJjappBCDSw= github.com/go-git/go-git/v5 v5.1.0 h1:HxJn9g/E7eYvKW3Fm7Jt4ee8LXfPOm/H1cdDu8vEssk= github.com/go-git/go-git/v5 v5.1.0/go.mod h1:ZKfuPUoY1ZqIG4QG9BDBh3G4gLM5zvPuSJAozQrZuyM= @@ -191,7 +178,6 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -208,7 +194,6 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc= github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= -github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c h1:Lh2aW+HnU2Nbe1gqD9SOJLJxW1jBMmQOktN2acDyJk8= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= @@ -271,7 +256,6 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/labstack/echo/v4 v4.1.11/go.mod h1:i541M3Fj6f76NZtHSj7TXnyM8n2gaodfvfxNnFqi74g= github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= @@ -311,7 +295,6 @@ github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5Vgl github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -332,7 +315,6 @@ github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/ github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= -github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1-0.20171018195549-f15c970de5b7/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -418,7 +400,6 @@ github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGr github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= -github.com/urfave/cli v1.20.0 h1:fDqGv3UG/4jbVl/QkFwEdddtEDjh/5Ov6X+0B/3bPaw= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.22.2 h1:gsqYFH8bb9ekPA12kRo0hfjngWQjkJPlN9R0N78BoUo= github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= @@ -457,7 +438,6 @@ go.uber.org/automaxprocs v1.3.0/go.mod h1:9CWT6lKIep8U41DDaPiH6eFscnTyjfTANNQNx6 go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= -go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.15.0 h1:ZZCA22JRF2gQE5FoNmhmrf7jeJJ2uhqDUNRYKm8dvmM= @@ -603,7 +583,6 @@ golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f h1:kDxGY2VmgABOe55qheT/TFq golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= @@ -642,12 +621,10 @@ google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyz google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= -gopkg.in/alexcesaro/statsd.v2 v2.0.0 h1:FXkZSCZIH17vLCO5sO2UucTHsH9pc+17F6pl3JVCwMc= gopkg.in/alexcesaro/statsd.v2 v2.0.0/go.mod h1:i0ubccKGzBVNBpdGV5MocxyA/XlLUJzA7SLonnE4drU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= @@ -671,9 +648,7 @@ gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20191120175047-4206685974f2/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= -gotest.tools/v3 v3.0.2 h1:kG1BFyqVHuQoVQiR1bWGnfz/fmHvvuiSPIV7rvl360E= gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/types/config.go b/types/config.go index 84be4a8c7..eb905ec2b 100644 --- a/types/config.go +++ b/types/config.go @@ -16,6 +16,9 @@ type Config struct { Auth AuthConfig `yaml:"auth"` // grpc auth GRPCConfig GRPCConfig `yaml:"grpc"` // grpc config + WALFile string `yaml:"wal_file" required:"true" default:"core.wal"` // WAL file path + WALOpenTimeout time.Duration `yaml:"wal_open_timeout" required:"true" default:"8s"` // timeout for opening a WAL file + Git GitConfig `yaml:"git"` Etcd EtcdConfig `yaml:"etcd"` Docker DockerConfig `yaml:"docker"` diff --git a/types/errors.go b/types/errors.go index a6d1c156d..47dcf8b3e 100644 --- a/types/errors.go +++ b/types/errors.go @@ -80,6 +80,9 @@ var ( ErrNodeNotExists = errors.New("node not exists") ErrWorkloadNotExists = errors.New("workload not exists") + + ErrUnregisteredWALEventType = errors.New("unregistered WAL event type") + ErrInvalidWALBucket = errors.New("invalid WAL bucket") ) // NewDetailedErr returns an error with details diff --git a/wal/hydro.go b/wal/hydro.go new file mode 100644 index 000000000..3eb7d4bc3 --- /dev/null +++ b/wal/hydro.go @@ -0,0 +1,137 @@ +package wal + +import ( + "context" + "encoding/json" + "strconv" + "strings" + "sync" + "time" + + log "github.com/sirupsen/logrus" + + coretypes "github.com/projecteru2/core/types" + "github.com/projecteru2/core/wal/kv" +) + +// Hydro is the simplest wal implementation. +type Hydro struct { + handlers sync.Map + kv kv.KV +} + +// NewHydro initailizes a new Hydro instance. +func NewHydro() *Hydro { + return &Hydro{ + kv: kv.NewLithium(), + } +} + +// Open connects a kvdb. +func (h *Hydro) Open(ctx context.Context, path string, timeout time.Duration) (err error) { + err = h.kv.Open(ctx, path, 0600, timeout) + return +} + +// Close disconnects the kvdb. +func (h *Hydro) Close(ctx context.Context) error { + return h.kv.Close(ctx) +} + +// Register registers a new event handler. +func (h *Hydro) Register(handler EventHandler) { + h.handlers.Store(handler.Event, handler) +} + +// Recover starts a disaster recovery, which will replay all the events. +func (h *Hydro) Recover(ctx context.Context) { + for ent := range h.kv.Scan(ctx, []byte(EventPrefix)) { + event, err := h.decodeEvent(ent) + if err != nil { + log.Errorf("[Recover] decode event error: %v", err) + continue + } + + handler, ok := h.getEventHandler(event.Type) + if !ok { + log.Errorf("[Recover] no such event handler for %s", event.Type) + continue + } + + if err := h.recover(ctx, handler, event); err != nil { + log.Errorf("[Recover] handle event %d (%s) failed: %v", event.ID, event.Type, err) + continue + } + } +} + +func (h *Hydro) recover(ctx context.Context, handler EventHandler, event HydroEvent) error { + item, err := handler.Decode(event.Item) + if err != nil { + return err + } + + switch handle, err := handler.Check(item); { + case err != nil: + return err + case !handle: + return event.Delete(ctx) + } + + return handler.Handle(item) +} + +// Log records a log item. +func (h *Hydro) Log(ctx context.Context, eventype string, item interface{}) (Commit, error) { + handler, ok := h.getEventHandler(eventype) + if !ok { + return nil, coretypes.NewDetailedErr(coretypes.ErrUnregisteredWALEventType, eventype) + } + + bs, err := handler.Encode(item) + if err != nil { + return nil, err + } + + event := NewHydroEvent(h.kv) + event.Type = eventype + event.Item = bs + + if err = event.Create(ctx); err != nil { + return nil, err + } + + commit := func(context.Context) error { + return event.Delete(ctx) + } + + return commit, nil +} + +func (h *Hydro) getEventHandler(event string) (handler EventHandler, ok bool) { + var raw interface{} + if raw, ok = h.handlers.Load(event); !ok { + return + } + + handler, ok = raw.(EventHandler) + + return +} + +func (h *Hydro) decodeEvent(ent kv.ScanEntry) (event HydroEvent, err error) { + if err = ent.Error(); err != nil { + return + } + + key, value := ent.Pair() + if err = json.Unmarshal(value, &event); err != nil { + return + } + + event.kv = h.kv + + event.ID, err = strconv.ParseUint(strings.TrimPrefix(string(key), EventPrefix), 10, 64) + + return +} diff --git a/wal/hydro_event.go b/wal/hydro_event.go new file mode 100644 index 000000000..72bd5d9bd --- /dev/null +++ b/wal/hydro_event.go @@ -0,0 +1,55 @@ +package wal + +import ( + "context" + "encoding/json" + "path/filepath" + "strconv" + + "github.com/projecteru2/core/wal/kv" +) + +// HydroEvent indicates a log event. +type HydroEvent struct { + // A global unique identifier. + ID uint64 `json:"id"` + + // Registered event type name. + Type string `json:"type"` + + // The encoded log item. + Item []byte `json:"item"` + + kv kv.KV +} + +// NewHydroEvent initializes a new HydroEvent instance. +func NewHydroEvent(kv kv.KV) (e *HydroEvent) { + e = &HydroEvent{} + e.kv = kv + return +} + +// Create persists this event. +func (e *HydroEvent) Create(ctx context.Context) (err error) { + if e.ID, err = e.kv.NextSequence(ctx); err != nil { + return + } + + var value []byte + if value, err = json.MarshalIndent(e, "", "\t"); err != nil { + return err + } + + return e.kv.Put(ctx, e.Key(), value) +} + +// Delete removes this event from persistence. +func (e HydroEvent) Delete(ctx context.Context) error { + return e.kv.Delete(ctx, e.Key()) +} + +// Key returns this event's key path. +func (e HydroEvent) Key() []byte { + return []byte(filepath.Join(EventPrefix, strconv.FormatUint(e.ID, 10))) +} diff --git a/wal/hydro_test.go b/wal/hydro_test.go new file mode 100644 index 000000000..edae9ae3f --- /dev/null +++ b/wal/hydro_test.go @@ -0,0 +1,226 @@ +package wal + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/projecteru2/core/wal/kv" +) + +func TestLogFailedAsNoSuchHandler(t *testing.T) { + hydro := NewHydro() + commit, err := hydro.Log(context.Background(), "create", struct{}{}) + require.Error(t, err) + require.Nil(t, commit) +} + +func TestLogFailedAsEncodeError(t *testing.T) { + var checked, handled, encoded, decoded bool + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + handler.Encode = func(interface{}) ([]byte, error) { return nil, fmt.Errorf("encode error") } + + hydro := NewHydro() + hydro.kv = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(context.Background(), eventype, struct{}{}) + require.Error(t, err) + require.Nil(t, commit) + require.False(t, encoded) + require.False(t, checked) + require.False(t, decoded) + require.False(t, handled) +} + +func TestLogWithCommitEvent(t *testing.T) { + var checked, handled, encoded, decoded bool + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + + hydro := NewHydro() + hydro.kv = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(context.Background(), eventype, struct{}{}) + require.NoError(t, err) + require.NotNil(t, commit) + + require.NoError(t, commit(context.Background())) + require.True(t, encoded) + require.False(t, decoded) + require.False(t, checked) + require.False(t, handled) +} + +func TestRecoverFailedAsNoSuchHandler(t *testing.T) { + var checked, handled, encoded, decoded bool + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + + hydro := NewHydro() + hydro.kv = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(context.Background(), eventype, struct{}{}) + require.NoError(t, err) + require.NotNil(t, commit) + + hydro.handlers.Delete(eventype) + + hydro.Recover(context.Background()) + require.True(t, encoded) + require.False(t, decoded) + require.False(t, checked) + require.False(t, handled) +} + +func TestRecoverFailedAsCheckError(t *testing.T) { + var checked, handled, encoded, decoded bool + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + handler.Check = func(interface{}) (bool, error) { + checked = true + return false, fmt.Errorf("check error") + } + + hydro := NewHydro() + hydro.kv = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(context.Background(), eventype, struct{}{}) + require.NoError(t, err) + require.NotNil(t, commit) + + hydro.Recover(context.Background()) + require.True(t, encoded) + require.True(t, decoded) + require.True(t, checked) + require.False(t, handled) +} + +func TestDecodeEventFailedAsDecodeEntryError(t *testing.T) { + hydro := NewHydro() + ent := kv.MockedScanEntry{Value: []byte("x")} + _, err := hydro.decodeEvent(ent) + require.Error(t, err) +} + +func TestDecodeEventFailedAsInvalidEventID(t *testing.T) { + hydro := NewHydro() + ent := kv.MockedScanEntry{Key: "/events/x", Value: []byte("{}")} + _, err := hydro.decodeEvent(ent) + require.Error(t, err) +} + +func TestDecodeEventFailedAsEntryError(t *testing.T) { + hydro := NewHydro() + expErr := fmt.Errorf("expects an error") + ent := kv.MockedScanEntry{Err: expErr} + _, err := hydro.decodeEvent(ent) + require.Error(t, err) + require.Equal(t, expErr.Error(), err.Error()) +} + +func TestRecoverFailedAsDecodeLogError(t *testing.T) { + var checked, handled, encoded, decoded bool + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + handler.Decode = func([]byte) (interface{}, error) { + decoded = true + return nil, fmt.Errorf("decode error") + } + + hydro := NewHydro() + hydro.kv = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(context.Background(), eventype, struct{}{}) + require.NoError(t, err) + require.NotNil(t, commit) + + hydro.Recover(context.Background()) + require.True(t, encoded) + require.True(t, decoded) + require.False(t, checked) + require.False(t, handled) +} + +func TestHydroRecoverDiscardNoNeedEvent(t *testing.T) { + var checked, handled, encoded, decoded bool + check := func(interface{}) (need bool, err error) { + checked = true + return + } + + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + handler.Check = check + + hydro := NewHydro() + hydro.kv = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(context.Background(), eventype, struct{}{}) + require.NoError(t, err) + require.NotNil(t, commit) + + hydro.Recover(context.Background()) + require.True(t, encoded) + require.True(t, decoded) + require.True(t, checked) + require.False(t, handled) +} + +func TestHydroRecover(t *testing.T) { + var checked, handled, encoded, decoded bool + eventype := "create" + handler := newTestEventHandler(eventype, &checked, &handled, &encoded, &decoded) + + hydro := NewHydro() + hydro.kv = kv.NewMockedKV() + hydro.Register(handler) + + commit, err := hydro.Log(context.Background(), eventype, struct{}{}) + require.NoError(t, err) + require.NotNil(t, commit) + + hydro.Recover(context.Background()) + require.True(t, encoded) + require.True(t, decoded) + require.True(t, checked) + require.True(t, handled) +} + +func newTestEventHandler(eventype string, checked, handled, encoded, decoded *bool) EventHandler { + check := func(interface{}) (bool, error) { + *checked = true + return true, nil + } + + handle := func(interface{}) (err error) { + *handled = true + return + } + + encode := func(interface{}) (bs []byte, err error) { + *encoded = true + return + } + + decode := func([]byte) (item interface{}, err error) { + *decoded = true + return + } + + return EventHandler{ + Event: eventype, + Encode: encode, + Decode: decode, + Check: check, + Handle: handle, + } +} diff --git a/wal/kv/kv.go b/wal/kv/kv.go new file mode 100644 index 000000000..ed309e9e2 --- /dev/null +++ b/wal/kv/kv.go @@ -0,0 +1,44 @@ +package kv + +import ( + "context" + "os" + "time" +) + +// KV is the interface that groups the Simpler and Scanner interfaces. +type KV interface { + OpenCloser + Simpler + Scanner + Sequencer +} + +// Simpler is the interface that groups the basic Put, Get and Delete methods. +type Simpler interface { + Put(context.Context, []byte, []byte) error + Get(context.Context, []byte) ([]byte, error) + Delete(context.Context, []byte) error +} + +// Scanner is the interface that wraps the basic Scan method. +type Scanner interface { + Scan(context.Context, []byte) <-chan ScanEntry +} + +// Sequencer is the interface that wraps the basic NextSequence method. +type Sequencer interface { + NextSequence(context.Context) (id uint64, err error) +} + +// OpenCloser is the interface that groups the basic Open and Close methods. +type OpenCloser interface { + Open(ctx context.Context, path string, mode os.FileMode, timeout time.Duration) error + Close(context.Context) error +} + +// ScanEntry is the interface that groups the basic Pair and Error methods. +type ScanEntry interface { + Pair() (key []byte, value []byte) + Error() error +} diff --git a/wal/kv/lithium.go b/wal/kv/lithium.go new file mode 100644 index 000000000..c9ee4c9da --- /dev/null +++ b/wal/kv/lithium.go @@ -0,0 +1,173 @@ +package kv + +import ( + "bytes" + "context" + "os" + "sync" + "time" + + "github.com/boltdb/bolt" + + "github.com/projecteru2/core/types" +) + +// Lithium . +type Lithium struct { + sync.Mutex + + // Name of the root bucket. + RootBucketKey []byte + + bolt *bolt.DB +} + +// NewLithium initializes a new Lithium instance. +func NewLithium() *Lithium { + return &Lithium{ + RootBucketKey: []byte("root"), + } +} + +// Open opens a kvdb file. +func (l *Lithium) Open(ctx context.Context, path string, mode os.FileMode, timeout time.Duration) (err error) { + l.Lock() + defer l.Unlock() + + if l.bolt, err = bolt.Open(path, mode, &bolt.Options{Timeout: timeout}); err != nil { + return + } + + err = l.bolt.Update(func(tx *bolt.Tx) error { + _, ce := tx.CreateBucketIfNotExists(l.RootBucketKey) + return ce + }) + + return +} + +// Close closes the kvdb file. +func (l *Lithium) Close(ctx context.Context) error { + l.Lock() + defer l.Unlock() + return l.bolt.Close() +} + +// Put creates/updates a key/value pair. +func (l *Lithium) Put(ctx context.Context, key []byte, value []byte) (err error) { + l.Lock() + defer l.Unlock() + + return l.update(func(bkt *bolt.Bucket) error { + return bkt.Put(key, value) + }) +} + +// Get read a key's value. +func (l *Lithium) Get(ctx context.Context, key []byte) (dst []byte, err error) { + l.Lock() + defer l.Unlock() + + err = l.view(func(bkt *bolt.Bucket) error { + src := bkt.Get(key) + dst = make([]byte, len(src)) + + for n := 0; n < len(dst); { + n += copy(dst, src) + } + + return nil + }) + + return +} + +// Delete deletes a key. +func (l *Lithium) Delete(ctx context.Context, key []byte) error { + l.Lock() + defer l.Unlock() + return l.update(func(bkt *bolt.Bucket) error { + return bkt.Delete(key) + }) +} + +// Scan scans all the key/value pairs. +func (l *Lithium) Scan(ctx context.Context, prefix []byte) <-chan ScanEntry { + ch := make(chan ScanEntry) + + go func() { + defer close(ch) + + l.Lock() + defer l.Unlock() + + ent := &LithiumScanEntry{} + + scan := func(bkt *bolt.Bucket) error { + c := bkt.Cursor() + for key, value := c.First(); key != nil && bytes.HasPrefix(key, prefix); key, value = c.Next() { + ent.key = key + ent.value = value + ch <- *ent + } + return nil + } + + if err := l.view(scan); err != nil { + ent.err = err + ch <- *ent + } + }() + + return ch +} + +func (l *Lithium) view(fn func(*bolt.Bucket) error) error { + return l.bolt.Update(func(tx *bolt.Tx) error { + bkt, err := l.getBucket(tx, l.RootBucketKey) + if err != nil { + return err + } + return fn(bkt) + }) +} + +func (l *Lithium) update(fn func(*bolt.Bucket) error) error { + return l.bolt.Update(func(tx *bolt.Tx) error { + bkt, err := l.getBucket(tx, l.RootBucketKey) + if err != nil { + return err + } + return fn(bkt) + }) +} + +func (l *Lithium) getBucket(tx *bolt.Tx, key []byte) (bkt *bolt.Bucket, err error) { + bkt = tx.Bucket(l.RootBucketKey) + if bkt == nil { + err = types.NewDetailedErr(types.ErrInvalidWALBucket, key) + } + return +} + +// NextSequence generates a new sequence. +func (l *Lithium) NextSequence(context.Context) (uint64, error) { + return 0, nil +} + +// LithiumScanEntry indicates an entry of scanning. +type LithiumScanEntry struct { + err error + key []byte + value []byte +} + +// Pair means a pair of key/value. +func (e LithiumScanEntry) Pair() ([]byte, []byte) { + return e.key, e.value +} + +// Error . +func (e LithiumScanEntry) Error() error { + return e.err +} diff --git a/wal/kv/lithium_test.go b/wal/kv/lithium_test.go new file mode 100644 index 000000000..adc855247 --- /dev/null +++ b/wal/kv/lithium_test.go @@ -0,0 +1,81 @@ +package kv + +import ( + "context" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestSet(t *testing.T) { + lit, cancel := newTestLithium(t) + defer cancel() + require.NoError(t, lit.Put(context.Background(), []byte("key"), []byte("value"))) +} + +func TestGet(t *testing.T) { + lit, cancel := newTestLithium(t) + defer cancel() + + key := []byte("key") + value := []byte("value") + require.NoError(t, lit.Put(context.Background(), key, value)) + + act, err := lit.Get(context.Background(), key) + require.NoError(t, err) + require.Equal(t, value, act) +} + +func TestDelete(t *testing.T) { + lit, cancel := newTestLithium(t) + defer cancel() + + key := []byte("key") + value := []byte("value") + require.NoError(t, lit.Put(context.Background(), key, value)) + + act, err := lit.Get(context.Background(), key) + require.NoError(t, err) + require.Equal(t, value, act) + + // deletes the key + require.NoError(t, lit.Delete(context.Background(), key)) + + act, err = lit.Get(context.Background(), key) + require.NoError(t, err) + require.Equal(t, []byte{}, act) +} + +func TestScan(t *testing.T) { + lit, cancel := newTestLithium(t) + defer cancel() + + key := []byte("/p1/key") + value := []byte("value") + require.NoError(t, lit.Put(context.Background(), key, value)) + require.NoError(t, lit.Put(context.Background(), []byte("/p2/key"), value)) + + ch := lit.Scan(context.Background(), []byte("/p1/")) + require.Equal(t, LithiumScanEntry{key: key, value: value}, <-ch) + require.Equal(t, nil, <-ch) +} + +func TestNextSequence(t *testing.T) { + // TODO +} + +func newTestLithium(t *testing.T) (lit *Lithium, cancel func()) { + path := "/tmp/lithium.unitest.wal" + os.Remove(path) + + lit = NewLithium() + require.NoError(t, lit.Open(context.Background(), path, 0666, time.Second)) + + cancel = func() { + require.NoError(t, lit.Close(context.Background())) + } + + return +} diff --git a/wal/kv/mocked.go b/wal/kv/mocked.go new file mode 100644 index 000000000..31f27df60 --- /dev/null +++ b/wal/kv/mocked.go @@ -0,0 +1,134 @@ +package kv + +import ( + "context" + "fmt" + "os" + "strings" + "sync" + "time" +) + +// MockedKV . +type MockedKV struct { + sync.Mutex + pool sync.Map + nextSeq uint64 +} + +// NewMockedKV . +func NewMockedKV() *MockedKV { + return &MockedKV{ + nextSeq: 1, + } +} + +// Open . +func (m *MockedKV) Open(ctx context.Context, path string, mode os.FileMode, timeout time.Duration) error { + return nil +} + +// Close . +func (m *MockedKV) Close(context.Context) error { + keys := []interface{}{} + m.pool.Range(func(key, _ interface{}) bool { + keys = append(keys, key) + return true + }) + + for _, key := range keys { + m.pool.Delete(key) + } + + return nil +} + +// NextSequence . +func (m *MockedKV) NextSequence(ctx context.Context) (nextSeq uint64, err error) { + m.Lock() + defer m.Unlock() + nextSeq = m.nextSeq + m.nextSeq++ + return +} + +// Put . +func (m *MockedKV) Put(ctx context.Context, key, value []byte) (err error) { + m.pool.Store(string(key), value) + return +} + +// Get . +func (m *MockedKV) Get(ctx context.Context, key []byte) (value []byte, err error) { + raw, ok := m.pool.Load(string(key)) + if !ok { + err = fmt.Errorf("no such key: %s", key) + return + } + + if value, ok = raw.([]byte); !ok { + err = fmt.Errorf("value must be a []byte, but %v", raw) + } + + return +} + +// Delete . +func (m *MockedKV) Delete(ctx context.Context, key []byte) (err error) { + m.pool.Delete(string(key)) + return +} + +// Scan . +func (m *MockedKV) Scan(ctx context.Context, prefix []byte) <-chan ScanEntry { + ch := make(chan ScanEntry) + + go func() { + defer close(ch) + + m.pool.Range(func(rkey, rvalue interface{}) (next bool) { + next = true + + var entry MockedScanEntry + defer func() { + ch <- entry + }() + + var ok bool + if entry.Key, ok = rkey.(string); !ok { + entry.Err = fmt.Errorf("key must be a string, but %v", rkey) + return + } + + if !strings.HasPrefix(entry.Key, string(prefix)) { + return + } + + if entry.Value, ok = rvalue.([]byte); !ok { + entry.Err = fmt.Errorf("value must be a []byte, but %v", rvalue) + return + } + + return + }) + }() + + return ch +} + +// MockedScanEntry . +type MockedScanEntry struct { + Err error + Key string + Value []byte +} + +// Pair . +func (e MockedScanEntry) Pair() ([]byte, []byte) { + return []byte(e.Key), e.Value +} + +// Error . +func (e MockedScanEntry) Error() error { + return e.Err +} diff --git a/wal/mocks/WAL.go b/wal/mocks/WAL.go new file mode 100644 index 000000000..47b1b4f47 --- /dev/null +++ b/wal/mocks/WAL.go @@ -0,0 +1,48 @@ +// Code generated by mockery v2.3.0. DO NOT EDIT. + +package mocks + +import ( + context "context" + + wal "github.com/projecteru2/core/wal" + mock "github.com/stretchr/testify/mock" +) + +// WAL is an autogenerated mock type for the WAL type +type WAL struct { + mock.Mock +} + +// Log provides a mock function with given fields: _a0, _a1, _a2 +func (_m *WAL) Log(_a0 context.Context, _a1 string, _a2 interface{}) (wal.Commit, error) { + ret := _m.Called(_a0, _a1, _a2) + + var r0 wal.Commit + if rf, ok := ret.Get(0).(func(context.Context, string, interface{}) wal.Commit); ok { + r0 = rf(_a0, _a1, _a2) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(wal.Commit) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, interface{}) error); ok { + r1 = rf(_a0, _a1, _a2) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Recover provides a mock function with given fields: _a0 +func (_m *WAL) Recover(_a0 context.Context) { + _m.Called(_a0) +} + +// Register provides a mock function with given fields: _a0 +func (_m *WAL) Register(_a0 wal.EventHandler) { + _m.Called(_a0) +} diff --git a/wal/wal.go b/wal/wal.go new file mode 100644 index 000000000..8260a6216 --- /dev/null +++ b/wal/wal.go @@ -0,0 +1,91 @@ +package wal + +import ( + "context" + "time" +) + +const ( + // EventPrefix indicates the key prefix of all events' keys. + EventPrefix = "/events/" +) + +// WAL is the interface that groups the Register and Recover interfaces. +type WAL interface { + Registry + Recoverer + Logger + OpenCloser +} + +// Recoverer is the interface that wraps the basic Recover method. +type Recoverer interface { + Recover(context.Context) +} + +// Registry is the interface that wraps the basic Register method. +type Registry interface { + Register(EventHandler) +} + +// Logger is the interface that wraps the basic Log method. +type Logger interface { + Log(context.Context, string, interface{}) (Commit, error) +} + +// OpenCloser is the interface that groups the basic Open and Close methods. +type OpenCloser interface { + Open(context.Context, string, time.Duration) error + Close(context.Context) error +} + +// EventHandler indicates a handler of a specific event. +type EventHandler struct { + Event string + Check Check + Encode Encode + Decode Decode + Handle Handle +} + +// Encode is a function to encode a log item +type Encode func(interface{}) ([]byte, error) + +// Decode is a function to decode bytes to an interface{} +type Decode func([]byte) (interface{}, error) + +// Handle is a function to play a log item. +type Handle func(interface{}) error + +// Check is a function for checking a log item whether need to be played it. +type Check func(interface{}) (need bool, err error) + +// Commit is a function for committing an event log. +type Commit func(context.Context) error + +// Register registers a new event to doit. +func Register(handler EventHandler) { + wal.Register(handler) +} + +// Log records a log item. +func Log(ctx context.Context, event string, item interface{}) (Commit, error) { + return wal.Log(ctx, event, item) +} + +// Recover makes a disaster recovery. +func Recover(ctx context.Context) { + wal.Recover(ctx) +} + +// Close closes a WAL file. +func Close(ctx context.Context) error { + return wal.Close(ctx) +} + +// Open opens a WAL file. +func Open(ctx context.Context, path string, timeout time.Duration) error { + return wal.Open(ctx, path, timeout) +} + +var wal WAL = NewHydro() diff --git a/wal/wal_test.go b/wal/wal_test.go new file mode 100644 index 000000000..74d9d70b3 --- /dev/null +++ b/wal/wal_test.go @@ -0,0 +1,67 @@ +package wal + +import ( + "context" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/projecteru2/core/wal/kv" +) + +func TestRecover(t *testing.T) { + var checked bool + check := func(interface{}) (bool, error) { + checked = true + return true, nil + } + + var handled bool + handle := func(interface{}) (err error) { + handled = true + return + } + + var encoded bool + encode := func(interface{}) (bs []byte, err error) { + encoded = true + return + } + + var decoded bool + decode := func([]byte) (item interface{}, err error) { + decoded = true + return + } + + path := "/tmp/wal.unitest.wal" + os.Remove(path) + + require.NoError(t, Open(context.Background(), path, time.Second)) + defer Close(context.Background()) + + hydro, ok := wal.(*Hydro) + require.True(t, ok) + require.NotNil(t, hydro) + hydro.kv = kv.NewMockedKV() + + eventype := "create" + + Register(EventHandler{ + Event: eventype, + Encode: encode, + Decode: decode, + Check: check, + Handle: handle, + }) + + Log(context.Background(), eventype, struct{}{}) + + Recover(context.Background()) + require.True(t, checked) + require.True(t, handled) + require.True(t, encoded) + require.True(t, decoded) +}