diff --git a/Makefile b/Makefile index e19e7e1899..8618f38764 100644 --- a/Makefile +++ b/Makefile @@ -85,17 +85,19 @@ kubetypes: --gogo_out=plugins=grpc:. $< mocks: - mockery -case=underscore -dir query -output query/mocks -name Client - mockery -case=underscore -dir txutil -output txutil/mocks -name Client - mockery -case=underscore -dir app/market -output app/market/mocks -name Client - mockery -case=underscore -dir app/market -output app/market/mocks -name Engine - mockery -case=underscore -dir app/market -output app/market/mocks -name Facilitator - mockery -case=underscore -dir marketplace -output marketplace/mocks -name Handler - mockery -case=underscore -dir provider/cluster -output provider/cluster/mocks -name Client - mockery -case=underscore -dir provider/cluster -output provider/cluster/mocks -name Cluster - mockery -case=underscore -dir provider/cluster -output provider/cluster/mocks -name Deployment - mockery -case=underscore -dir provider/cluster -output provider/cluster/mocks -name Reservation - mockery -case=underscore -dir provider/manifest -output provider/manifest/mocks -name Handler + mockery -case=underscore -dir query -output query/mocks -name Client + mockery -case=underscore -dir txutil -output txutil/mocks -name Client + mockery -case=underscore -dir app/market -output app/market/mocks -name Client + mockery -case=underscore -dir app/market -output app/market/mocks -name Engine + mockery -case=underscore -dir app/market -output app/market/mocks -name Facilitator + mockery -case=underscore -dir marketplace -output marketplace/mocks -name Handler + mockery -case=underscore -dir provider/cluster -output provider/cluster/mocks -name Client + mockery -case=underscore -dir provider/cluster -output provider/cluster/mocks -name Cluster + mockery -case=underscore -dir provider/cluster -output provider/cluster/mocks -name Deployment + mockery -case=underscore -dir provider/cluster -output provider/cluster/mocks -name Reservation + mockery -case=underscore -dir provider/cluster/kube -output provider/cluster/kube/mocks -name Client + mockery -case=underscore -dir provider/manifest -output provider/manifest/mocks -name Handler + gofmt: find . -not -path './vendor*' -name '*.go' -type f | \ diff --git a/cmd/akash/deployment.go b/cmd/akash/deployment.go index f595adc8d2..65275dd502 100644 --- a/cmd/akash/deployment.go +++ b/cmd/akash/deployment.go @@ -11,7 +11,7 @@ import ( "github.com/ovrclk/akash/keys" "github.com/ovrclk/akash/manifest" "github.com/ovrclk/akash/marketplace" - mhttp "github.com/ovrclk/akash/provider/manifest/http" + http "github.com/ovrclk/akash/provider/http" "github.com/ovrclk/akash/sdl" "github.com/ovrclk/akash/types" . "github.com/ovrclk/akash/util" @@ -27,7 +27,8 @@ func deploymentCommand() *cobra.Command { cmd.AddCommand(createDeploymentCommand()) cmd.AddCommand(closeDeploymentCommand()) - cmd.AddCommand(sendManifestCommand()) + // todo: this command is not working + // cmd.AddCommand(sendManifestCommand()) return cmd } @@ -121,7 +122,7 @@ func createDeployment(session session.Session, cmd *cobra.Command, args []string // send manifest over http to provider uri fmt.Printf("Sending manifest to %v...\n", prov.HostURI) - err = mhttp.Send(mani, txclient.Signer(), prov, tx.Deployment) + err = http.SendManifest(mani, txclient.Signer(), prov, tx.Deployment) if err != nil { fmt.Printf("ERROR: %v", err) } @@ -182,57 +183,58 @@ func closeDeployment(session session.Session, cmd *cobra.Command, args []string) return nil } -func sendManifestCommand() *cobra.Command { - - cmd := &cobra.Command{ - Use: "sendmani ", - Short: "send manifest to all deployment providers", - Args: cobra.ExactArgs(2), - RunE: session.WithSession( - session.RequireKey(session.RequireNode(sendManifest))), - } - - session.AddFlagNode(cmd, cmd.Flags()) - session.AddFlagKey(cmd, cmd.Flags()) - - return cmd -} - -func sendManifest(session session.Session, cmd *cobra.Command, args []string) error { - signer, _, err := session.Signer() - if err != nil { - return err - } - - sdl, err := sdl.ReadFile(args[0]) - if err != nil { - return err - } - - mani, err := sdl.Manifest() - if err != nil { - return err - } - - depAddr, err := keys.ParseDeploymentPath(args[1]) - if err != nil { - return err - } - - leases, err := session.QueryClient().DeploymentLeases(session.Ctx(), depAddr.ID()) - if err != nil { - return err - } - - for _, lease := range leases.Items { - provider, err := session.QueryClient().Provider(session.Ctx(), lease.Provider) - if err != nil { - return err - } - err = mhttp.Send(mani, signer, provider, lease.Deployment) - if err != nil { - return err - } - } - return nil -} +// todo: this command is not working +// func sendManifestCommand() *cobra.Command { + +// cmd := &cobra.Command{ +// Use: "sendmani ", +// Short: "send manifest to all deployment providers", +// Args: cobra.ExactArgs(2), +// RunE: session.WithSession( +// session.RequireKey(session.RequireNode(sendManifest))), +// } + +// session.AddFlagNode(cmd, cmd.Flags()) +// session.AddFlagKey(cmd, cmd.Flags()) + +// return cmd +// } + +// func sendManifest(session session.Session, cmd *cobra.Command, args []string) error { +// signer, _, err := session.Signer() +// if err != nil { +// return err +// } + +// sdl, err := sdl.ReadFile(args[0]) +// if err != nil { +// return err +// } + +// mani, err := sdl.Manifest() +// if err != nil { +// return err +// } + +// depAddr, err := keys.ParseDeploymentPath(args[1]) +// if err != nil { +// return err +// } + +// leases, err := session.QueryClient().DeploymentLeases(session.Ctx(), depAddr.ID()) +// if err != nil { +// return err +// } + +// for _, lease := range leases.Items { +// provider, err := session.QueryClient().Provider(session.Ctx(), lease.Provider) +// if err != nil { +// return err +// } +// err = http.SendManifest(mani, signer, provider, lease.Deployment) +// if err != nil { +// return err +// } +// } +// return nil +// } diff --git a/cmd/akash/provider.go b/cmd/akash/provider.go index ae230be2c6..52ae4db45d 100644 --- a/cmd/akash/provider.go +++ b/cmd/akash/provider.go @@ -13,7 +13,7 @@ import ( "github.com/ovrclk/akash/provider/cluster" "github.com/ovrclk/akash/provider/cluster/kube" "github.com/ovrclk/akash/provider/event" - "github.com/ovrclk/akash/provider/manifest/http" + "github.com/ovrclk/akash/provider/http" psession "github.com/ovrclk/akash/provider/session" "github.com/ovrclk/akash/types" ptype "github.com/ovrclk/akash/types/provider" @@ -198,7 +198,7 @@ func doProviderRunCommand(session session.Session, cmd *cobra.Command, args []st go func() { defer cancel() - errch <- http.RunServer(ctx, session.Log(), "3001", service.ManifestHandler()) + errch <- http.RunServer(ctx, session.Log(), "3001", service.ManifestHandler(), cclient) }() var reterr error diff --git a/provider/cluster/client.go b/provider/cluster/client.go index 8e2203adbb..6e319d3d0b 100644 --- a/provider/cluster/client.go +++ b/provider/cluster/client.go @@ -1,12 +1,17 @@ package cluster -import "github.com/ovrclk/akash/types" +import ( + "github.com/ovrclk/akash/types" + "k8s.io/api/apps/v1" +) type Client interface { Deploy(types.LeaseID, *types.ManifestGroup) error Teardown(types.LeaseID) error Deployments() ([]Deployment, error) + KubeDeployments(types.LeaseID) (*v1.DeploymentList, error) + KubeDeployment(types.LeaseID, string) (*v1.Deployment, error) } type Deployment interface { @@ -24,6 +29,14 @@ func (nullClient) Deploy(_ types.LeaseID, _ *types.ManifestGroup) error { return nil } +func (nullClient) KubeDeployments(_ types.LeaseID) (*v1.DeploymentList, error) { + return nil, nil +} + +func (nullClient) KubeDeployment(_ types.LeaseID, _ string) (*v1.Deployment, error) { + return nil, nil +} + func (nullClient) Teardown(_ types.LeaseID) error { return nil } diff --git a/provider/cluster/kube/client.go b/provider/cluster/kube/client.go index c77261c11e..4e5bbb5382 100644 --- a/provider/cluster/kube/client.go +++ b/provider/cluster/kube/client.go @@ -10,6 +10,7 @@ import ( "github.com/ovrclk/akash/provider/cluster" "github.com/ovrclk/akash/types" "github.com/tendermint/tmlibs/log" + "k8s.io/api/apps/v1" apiextcs "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -103,8 +104,24 @@ func (c *client) Deployments() ([]cluster.Deployment, error) { return deployments, nil } -func (c *client) Deploy(lid types.LeaseID, group *types.ManifestGroup) error { +// todo: limit number of results and do pagination / streaming +func (c *client) KubeDeployments(lid types.LeaseID) (*v1.DeploymentList, error) { + deployments, err := c.kc.AppsV1().Deployments(lidNS(lid)).List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + return deployments, nil +} +func (c *client) KubeDeployment(lid types.LeaseID, name string) (*v1.Deployment, error) { + deployment, err := c.kc.AppsV1().Deployments(lidNS(lid)).Get(name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return deployment, nil +} + +func (c *client) Deploy(lid types.LeaseID, group *types.ManifestGroup) error { if err := applyNS(c.kc, newNSBuilder(lid, group)); err != nil { c.log.Error("applying namespace", "err", err, "lease", lid) return err diff --git a/provider/cluster/kube/mocks/client.go b/provider/cluster/kube/mocks/client.go new file mode 100644 index 0000000000..ada4f4c510 --- /dev/null +++ b/provider/cluster/kube/mocks/client.go @@ -0,0 +1,110 @@ +// Code generated by mockery v1.0.0 +package mocks + +import cluster "github.com/ovrclk/akash/provider/cluster" + +import mock "github.com/stretchr/testify/mock" +import types "github.com/ovrclk/akash/types" +import v1 "k8s.io/api/apps/v1" + +// Client is an autogenerated mock type for the Client type +type Client struct { + mock.Mock +} + +// Deploy provides a mock function with given fields: _a0, _a1 +func (_m *Client) Deploy(_a0 types.LeaseID, _a1 *types.ManifestGroup) error { + ret := _m.Called(_a0, _a1) + + var r0 error + if rf, ok := ret.Get(0).(func(types.LeaseID, *types.ManifestGroup) error); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Deployments provides a mock function with given fields: +func (_m *Client) Deployments() ([]cluster.Deployment, error) { + ret := _m.Called() + + var r0 []cluster.Deployment + if rf, ok := ret.Get(0).(func() []cluster.Deployment); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]cluster.Deployment) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// KubeDeployment provides a mock function with given fields: _a0, _a1 +func (_m *Client) KubeDeployment(_a0 types.LeaseID, _a1 string) (*v1.Deployment, error) { + ret := _m.Called(_a0, _a1) + + var r0 *v1.Deployment + if rf, ok := ret.Get(0).(func(types.LeaseID, string) *v1.Deployment); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*v1.Deployment) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(types.LeaseID, string) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// KubeDeployments provides a mock function with given fields: _a0 +func (_m *Client) KubeDeployments(_a0 types.LeaseID) (*v1.DeploymentList, error) { + ret := _m.Called(_a0) + + var r0 *v1.DeploymentList + if rf, ok := ret.Get(0).(func(types.LeaseID) *v1.DeploymentList); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*v1.DeploymentList) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(types.LeaseID) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Teardown provides a mock function with given fields: _a0 +func (_m *Client) Teardown(_a0 types.LeaseID) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(types.LeaseID) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/provider/cluster/mocks/client.go b/provider/cluster/mocks/client.go index 110f0ac6fc..81dee5fd84 100644 --- a/provider/cluster/mocks/client.go +++ b/provider/cluster/mocks/client.go @@ -4,6 +4,7 @@ package mocks import cluster "github.com/ovrclk/akash/provider/cluster" import mock "github.com/stretchr/testify/mock" import types "github.com/ovrclk/akash/types" +import v1 "k8s.io/api/apps/v1" // Client is an autogenerated mock type for the Client type type Client struct { @@ -47,6 +48,52 @@ func (_m *Client) Deployments() ([]cluster.Deployment, error) { return r0, r1 } +// KubeDeployment provides a mock function with given fields: _a0, _a1 +func (_m *Client) KubeDeployment(_a0 types.LeaseID, _a1 string) (*v1.Deployment, error) { + ret := _m.Called(_a0, _a1) + + var r0 *v1.Deployment + if rf, ok := ret.Get(0).(func(types.LeaseID, string) *v1.Deployment); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*v1.Deployment) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(types.LeaseID, string) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// KubeDeployments provides a mock function with given fields: _a0 +func (_m *Client) KubeDeployments(_a0 types.LeaseID) (*v1.DeploymentList, error) { + ret := _m.Called(_a0) + + var r0 *v1.DeploymentList + if rf, ok := ret.Get(0).(func(types.LeaseID) *v1.DeploymentList); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*v1.DeploymentList) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(types.LeaseID) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // Teardown provides a mock function with given fields: _a0 func (_m *Client) Teardown(_a0 types.LeaseID) error { ret := _m.Called(_a0) diff --git a/provider/cluster/monitor.go b/provider/cluster/monitor.go index 4c743fa9ed..69b6a151e5 100644 --- a/provider/cluster/monitor.go +++ b/provider/cluster/monitor.go @@ -81,7 +81,6 @@ func (dm *deploymentMonitor) teardown() error { func (dm *deploymentMonitor) run() { defer dm.lc.ShutdownCompleted() - runch := dm.do(dm.doDeploy) loop: @@ -103,7 +102,6 @@ loop: dm.mgroup = mgroup case dsDeployComplete: dm.mgroup = mgroup - // start update dm.state = dsDeployActive runch = dm.do(dm.doDeploy) @@ -125,7 +123,6 @@ loop: dm.state = dsDeployComplete case dsDeployPending: - // start update dm.state = dsDeployActive runch = dm.do(dm.doDeploy) diff --git a/provider/manifest/http/client.go b/provider/http/client.go similarity index 87% rename from provider/manifest/http/client.go rename to provider/http/client.go index 84d878b5d9..b7d2352f0e 100644 --- a/provider/manifest/http/client.go +++ b/provider/http/client.go @@ -10,7 +10,7 @@ import ( "github.com/ovrclk/akash/types" ) -func Send(manifest *types.Manifest, signer txutil.Signer, provider *types.Provider, deployment []byte) error { +func SendManifest(manifest *types.Manifest, signer txutil.Signer, provider *types.Provider, deployment []byte) error { _, buf, err := mutil.SignManifest(manifest, signer, deployment) if err != nil { return err diff --git a/provider/http/server.go b/provider/http/server.go new file mode 100644 index 0000000000..876828adf2 --- /dev/null +++ b/provider/http/server.go @@ -0,0 +1,182 @@ +package http + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + + "github.com/gogo/protobuf/jsonpb" + "github.com/gorilla/mux" + "github.com/ovrclk/akash/keys" + "github.com/ovrclk/akash/provider/cluster/kube" + "github.com/ovrclk/akash/provider/manifest" + "github.com/ovrclk/akash/types" + "github.com/tendermint/tmlibs/log" +) + +const ( + contentType = "application/json" + manifestPath = "/manifest" + statusPath = "/status" + leasePathPrefix = "/lease/" + deployment = "deployment" + group = "group" + order = "order" + provider = "provider" + name = "name" + leaseID = "{" + deployment + "}/{" + group + "}/{" + order + "}/{" + provider + "}" + leasePath = leasePathPrefix + leaseID + servicePath = leasePathPrefix + leaseID + "/{" + name + "}" +) + +func errorResponse(w http.ResponseWriter, log log.Logger, status int, message string) { + log.Error("error", "status", status, "message", message) + w.Header().Set("X-Content-Type-Options", "nosniff") + w.Header().Set("Content-Type", contentType) + w.WriteHeader(status) + json.NewEncoder(w).Encode(map[string]string{"message": message}) +} + +func manifestHandler(log log.Logger, phandler manifest.Handler) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + errorResponse(w, + log, + http.StatusMethodNotAllowed, + http.StatusText(http.StatusMethodNotAllowed)) + return + } + if r.Header.Get("Content-Type") != contentType { + errorResponse(w, + log, + http.StatusUnsupportedMediaType, + fmt.Sprintf("Content-Type '%v' required", contentType)) + return + } + if r.Body == nil { + errorResponse(w, log, http.StatusBadRequest, "Empty request body") + return + } + + obj := &types.ManifestRequest{} + if err := jsonpb.Unmarshal(r.Body, obj); err != nil { + errorResponse(w, log, http.StatusBadRequest, "Error decoding body") + return + } + r.Body.Close() + + log.Debug(fmt.Sprintf("%+v", obj)) + + if err := phandler.HandleManifest(obj); err != nil { + errorResponse(w, log, http.StatusBadRequest, "Invalid manifest") + return + } + + // respond with success + w.WriteHeader(http.StatusOK) + } +} + +func requestLogger(log log.Logger) mux.MiddlewareFunc { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + log.Info(r.Method, "path", r.URL.Path) + next.ServeHTTP(w, r) + }) + } +} + +func newLeaseHandler(log log.Logger, phandler manifest.Handler, client kube.Client) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json; charset=us-ascii") + // todo: check TLS cert against lease owner + lease, err := keys.ParseLeasePath(strings.TrimPrefix(r.URL.RequestURI(), leasePathPrefix)) + deployments, err := client.KubeDeployments(lease.LeaseID) + if err != nil { + log.Error(err.Error()) + errorResponse(w, log, http.StatusBadRequest, "no deployments found for lease") + return + } + if deployments == nil { + errorResponse(w, log, http.StatusBadRequest, "no deployments found for lease") + return + } + response := make(map[string]string) + for _, deployment := range deployments.Items { + response[deployment.Name] = fmt.Sprintf("available replicas: %v/%v", deployment.Status.AvailableReplicas, deployment.Status.Replicas) + } + json.NewEncoder(w).Encode(response) + } +} + +func newStatusHandler() func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/plain; charset=us-ascii") + w.Write([]byte("OK\n")) + } +} + +func newLeaseStatusHandler(log log.Logger, phandler manifest.Handler, client kube.Client) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json; charset=us-ascii") + // todo: check TLS cert against lease owner + vars := mux.Vars(r) + lease, err := keys.ParseLeasePath(strings.TrimSuffix(strings.TrimPrefix(r.URL.RequestURI(), leasePathPrefix), "/"+vars[name])) + deployment, err := client.KubeDeployment(lease.LeaseID, vars[name]) + if err != nil { + log.Error(err.Error()) + errorResponse(w, log, http.StatusBadRequest, "service not found for lease") + return + } + if deployment == nil { + errorResponse(w, log, http.StatusBadRequest, "service not found for lease") + return + } + status := deployment.Status + status.Conditions = nil + json.NewEncoder(w).Encode(status) + } +} + +func createHandlers(log log.Logger, handler manifest.Handler, client kube.Client) http.Handler { + r := mux.NewRouter() + r.HandleFunc(statusPath, newStatusHandler()) + r.HandleFunc(manifestPath, manifestHandler(log, handler)) + r.HandleFunc(leasePath, newLeaseHandler(log, handler, client)) + r.HandleFunc(servicePath, newLeaseStatusHandler(log, handler, client)) + r.Use(requestLogger(log)) + return r +} + +func RunServer(ctx context.Context, log log.Logger, port string, handler manifest.Handler, client kube.Client) error { + + address := fmt.Sprintf(":%v", port) + + server := &http.Server{ + Addr: address, + Handler: createHandlers(log, handler, client), + } + + ctx, cancel := context.WithCancel(ctx) + + donech := make(chan struct{}) + + go func() { + defer close(donech) + <-ctx.Done() + log.Info("Shutting down server") + server.Shutdown(context.Background()) + }() + + log.Info("Starting server", "address", address) + err := server.ListenAndServe() + cancel() + + <-donech + + log.Info("Server shutdown") + + return err +} diff --git a/provider/manifest/http/server_test.go b/provider/http/server_test.go similarity index 59% rename from provider/manifest/http/server_test.go rename to provider/http/server_test.go index 932e9b9aa7..99cc6de629 100644 --- a/provider/manifest/http/server_test.go +++ b/provider/http/server_test.go @@ -2,10 +2,13 @@ package http import ( "context" + "fmt" "io/ioutil" "net/http" "testing" + "github.com/ovrclk/akash/provider/cluster/kube" + cmock "github.com/ovrclk/akash/provider/cluster/kube/mocks" pmanifest "github.com/ovrclk/akash/provider/manifest/mocks" pmock "github.com/ovrclk/akash/provider/manifest/mocks" "github.com/ovrclk/akash/sdl" @@ -14,11 +17,23 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "k8s.io/api/apps/v1" ) +func TestStatus(t *testing.T) { + withServer(t, func() { + resp, err := http.Get("http://localhost:3001/status") + require.NoError(t, err) + body, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + fmt.Println(string(body)) + require.Equal(t, []byte("OK\n"), body) + }, nil, nil) +} + func TestManifest(t *testing.T) { - sdl, err := sdl.ReadFile("../../../_docs/deployment.yml") + sdl, err := sdl.ReadFile("../../_run/multi/deployment.yml") require.NoError(t, err) mani, err := sdl.Manifest() @@ -35,26 +50,31 @@ func TestManifest(t *testing.T) { handler := new(pmock.Handler) handler.On("HandleManifest", mock.Anything).Return(nil).Once() + client := new(cmock.Client) withServer(t, func() { - err = Send(mani, signer, provider, deployment) + err = SendManifest(mani, signer, provider, deployment) require.NoError(t, err) - }, handler) + }, handler, client) } -func TestStatus(t *testing.T) { +func TestLease(t *testing.T) { handler := new(pmock.Handler) + client := new(cmock.Client) + mockResp := v1.DeploymentList{} + client.On("KubeDeployments", mock.Anything, mock.Anything).Return(&mockResp, nil).Once() withServer(t, func() { - resp, err := http.Get("http://localhost:3001/status") + resp, err := http.Get("http://localhost:3001/lease/deployment/group/order/provider") require.NoError(t, err) body, err := ioutil.ReadAll(resp.Body) require.NoError(t, err) - require.Equal(t, []byte("OK\n"), body) - }, handler) + fmt.Println(string(body)) + require.Equal(t, []byte("{}\n"), body) + }, handler, client) } -func withServer(t *testing.T, fn func(), h *pmanifest.Handler) { +func withServer(t *testing.T, fn func(), h *pmanifest.Handler, c kube.Client) { donech := make(chan struct{}) defer func() { <-donech }() @@ -63,7 +83,7 @@ func withServer(t *testing.T, fn func(), h *pmanifest.Handler) { go func() { defer close(donech) - err := RunServer(ctx, testutil.Logger(), "3001", h) + err := RunServer(ctx, testutil.Logger(), "3001", h, c) assert.Error(t, http.ErrServerClosed, err) }() diff --git a/provider/manifest/handler.go b/provider/manifest/handler.go index 6dfe13df2d..6010f4b04d 100644 --- a/provider/manifest/handler.go +++ b/provider/manifest/handler.go @@ -181,7 +181,6 @@ loop: } h.session.Log().Error("deployment", did.EncodeString(), err.Error()) } - mstate := h.getManifestState(did) h.session.Log().Info("manifest received", "deployment", did) @@ -242,7 +241,6 @@ func (h *handler) getManifestState(did base.Bytes) *manifestState { func (h *handler) checkManifestState(ctx context.Context, mstate *manifestState, did base.Bytes) { if mstate.complete() { - // If all information has been received, emit ManifestReceived event. // TODO: validate manifest diff --git a/provider/manifest/http/server.go b/provider/manifest/http/server.go deleted file mode 100644 index 8cbe1d4711..0000000000 --- a/provider/manifest/http/server.go +++ /dev/null @@ -1,139 +0,0 @@ -package http - -import ( - "context" - "encoding/json" - "fmt" - "net/http" - - "github.com/gogo/protobuf/jsonpb" - "github.com/gorilla/mux" - "github.com/ovrclk/akash/provider/manifest" - "github.com/ovrclk/akash/types" - "github.com/tendermint/tmlibs/log" -) - -const ( - contentType = "application/json" - manifestPath = "/manifest" - statusPath = "/status" -) - -type handler struct { - phandler manifest.Handler - log log.Logger -} - -func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if r.Method != "POST" { - h.error(w, - http.StatusMethodNotAllowed, - http.StatusText(http.StatusMethodNotAllowed)) - return - } - if r.Header.Get("Content-Type") != contentType { - h.error(w, - http.StatusUnsupportedMediaType, - fmt.Sprintf("Content-Type '%v' required", contentType)) - return - } - if r.Body == nil { - h.error(w, http.StatusBadRequest, "Empty request body") - return - } - - obj := &types.ManifestRequest{} - if err := jsonpb.Unmarshal(r.Body, obj); err != nil { - h.error(w, http.StatusBadRequest, "Error decoding body") - return - } - r.Body.Close() - - h.log.Debug(fmt.Sprintf("%+v", obj)) - - if err := h.phandler.HandleManifest(obj); err != nil { - h.error(w, http.StatusBadRequest, "Invalid manifest") - return - } - - // respond with success - w.WriteHeader(http.StatusOK) -} - -func (h *handler) error(w http.ResponseWriter, status int, message string) { - h.log.Error("error", "status", status, "message", message) - w.Header().Set("X-Content-Type-Options", "nosniff") - w.Header().Set("Content-Type", contentType) - w.WriteHeader(status) - json.NewEncoder(w).Encode(map[string]string{"message": message}) -} - -func newHandler(log log.Logger, phandler manifest.Handler) http.Handler { - return &handler{ - log: log, - phandler: phandler, - } -} - -func requestLogger(log log.Logger) mux.MiddlewareFunc { - return func(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - log.Info(r.Method, "path", r.URL.Path) - next.ServeHTTP(w, r) - }) - } -} - -func newStatusHandler(log log.Logger, - phandler manifest.Handler) func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "text/plain; charset=us-ascii") - - if _, err := w.Write([]byte("OK\n")); err != nil { - log.Error("error in status response", "err", err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - w.WriteHeader(http.StatusOK) - } -} - -func createHandlers(log log.Logger, handler manifest.Handler) http.Handler { - r := mux.NewRouter() - r.Handle(manifestPath, newHandler(log, handler)) - r.HandleFunc(statusPath, newStatusHandler(log, handler)) - r.Use(requestLogger(log)) - return r -} - -func RunServer(ctx context.Context, log log.Logger, port string, handler manifest.Handler) error { - - address := fmt.Sprintf(":%v", port) - - server := &http.Server{ - Addr: address, - Handler: createHandlers(log, handler), - } - - ctx, cancel := context.WithCancel(ctx) - - donech := make(chan struct{}) - - go func() { - defer close(donech) - <-ctx.Done() - log.Info("Shutting down server") - server.Shutdown(context.Background()) - }() - - log.Info("Starting server", "address", address) - err := server.ListenAndServe() - cancel() - - <-donech - - log.Info("Server shutdown") - - return err -} diff --git a/types/id.go b/types/id.go index aafe7d848c..45bf64c326 100644 --- a/types/id.go +++ b/types/id.go @@ -169,4 +169,4 @@ func (id LeaseID) GroupID() DeploymentGroupID { func (id LeaseID) DeploymentID() base.Bytes { return id.Deployment -} +} \ No newline at end of file