Skip to content

Commit

Permalink
provider: log streaming
Browse files Browse the repository at this point in the history
fixes: #222
  • Loading branch information
aastein authored Jul 5, 2018
1 parent 37bc8e1 commit 112ceaa
Show file tree
Hide file tree
Showing 19 changed files with 1,718 additions and 342 deletions.
112 changes: 55 additions & 57 deletions cmd/akash/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ func deploymentCommand() *cobra.Command {

cmd.AddCommand(createDeploymentCommand())
cmd.AddCommand(closeDeploymentCommand())
// todo: this command is not working
// cmd.AddCommand(sendManifestCommand())
cmd.AddCommand(sendManifestCommand())

return cmd
}
Expand Down Expand Up @@ -183,58 +182,57 @@ func closeDeployment(session session.Session, cmd *cobra.Command, args []string)
return nil
}

// todo: this command is not working
// func sendManifestCommand() *cobra.Command {

// cmd := &cobra.Command{
// Use: "sendmani <manifest> <deployment>",
// Short: "send manifest to all deployment providers",
// Args: cobra.ExactArgs(2),
// RunE: session.WithSession(
// session.RequireKey(session.RequireNode(sendManifest))),
// }

// session.AddFlagNode(cmd, cmd.Flags())
// session.AddFlagKey(cmd, cmd.Flags())

// return cmd
// }

// func sendManifest(session session.Session, cmd *cobra.Command, args []string) error {
// signer, _, err := session.Signer()
// if err != nil {
// return err
// }

// sdl, err := sdl.ReadFile(args[0])
// if err != nil {
// return err
// }

// mani, err := sdl.Manifest()
// if err != nil {
// return err
// }

// depAddr, err := keys.ParseDeploymentPath(args[1])
// if err != nil {
// return err
// }

// leases, err := session.QueryClient().DeploymentLeases(session.Ctx(), depAddr.ID())
// if err != nil {
// return err
// }

// for _, lease := range leases.Items {
// provider, err := session.QueryClient().Provider(session.Ctx(), lease.Provider)
// if err != nil {
// return err
// }
// err = http.SendManifest(mani, signer, provider, lease.Deployment)
// if err != nil {
// return err
// }
// }
// return nil
// }
func sendManifestCommand() *cobra.Command {

cmd := &cobra.Command{
Use: "sendmani <manifest> <deployment>",
Short: "send manifest to all deployment providers",
Args: cobra.ExactArgs(2),
RunE: session.WithSession(
session.RequireKey(session.RequireNode(sendManifest))),
}

session.AddFlagNode(cmd, cmd.Flags())
session.AddFlagKey(cmd, cmd.Flags())

return cmd
}

func sendManifest(session session.Session, cmd *cobra.Command, args []string) error {
signer, _, err := session.Signer()
if err != nil {
return err
}

sdl, err := sdl.ReadFile(args[0])
if err != nil {
return err
}

mani, err := sdl.Manifest()
if err != nil {
return err
}

depAddr, err := keys.ParseDeploymentPath(args[1])
if err != nil {
return err
}

leases, err := session.QueryClient().DeploymentLeases(session.Ctx(), depAddr.ID())
if err != nil {
return err
}

for _, lease := range leases.Items {
provider, err := session.QueryClient().Provider(session.Ctx(), lease.Provider)
if err != nil {
return err
}
err = http.SendManifest(mani, signer, provider, lease.Deployment)
if err != nil {
return err
}
}
return nil
}
110 changes: 110 additions & 0 deletions cmd/akash/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package main

import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"

"github.com/ovrclk/akash/cmd/akash/session"
"github.com/ovrclk/akash/cmd/common"
"github.com/ovrclk/akash/keys"
"github.com/ovrclk/akash/types"
"github.com/spf13/cobra"
)

func logsCommand() *cobra.Command {

cmd := &cobra.Command{
Use: "logs <lease>",
Short: "service logs",
Args: cobra.ExactArgs(2),
RunE: session.WithSession(session.RequireNode(logs)),
}

session.AddFlagNode(cmd, cmd.PersistentFlags())
cmd.Flags().Int64P("lines", "l", 10, "Number of lines from the end of the logs to show per service")
cmd.Flags().BoolP("follow", "f", false, "Follow the log stream of the service")

return cmd
}

func logs(session session.Session, cmd *cobra.Command, args []string) error {
serviceName := args[0]
leasePath := args[1]
lease, err := keys.ParseLeasePath(leasePath)
if err != nil {
return err
}
provider, err := session.QueryClient().Provider(session.Ctx(), lease.Provider)
if err != nil {
return err
}
tailLines, err := cmd.Flags().GetInt64("lines")
if err != nil {
return err
}
follow, err := cmd.Flags().GetBool("follow")
if err != nil {
return err
}
options := types.LogOptions{
TailLines: tailLines,
Follow: follow,
}
b, err := json.Marshal(options)
if err != nil {
return err
}
url := provider.HostURI + "/logs/" + leasePath + "/" + serviceName
body, err := stream(session.Ctx(), url, b)
if err != nil {
return err
}

scanner := bufio.NewScanner(body)
if err := common.RunForever(printLog(session, scanner)); err != nil {
fmt.Println(err.Error())
}

defer body.Close()
return nil
}

func printLog(session session.Session, scanner *bufio.Scanner) func(context.Context) error {
return func(ctx context.Context) error {
for scanner.Scan() {
log := types.LogResponse{}
if err := json.Unmarshal(scanner.Bytes(), &log); err != nil {
session.Log().Error(err.Error())
}
if log.Result != nil {
fmt.Printf("[%v] %v\n", log.Result.Name, log.Result.Message)
}
}
return scanner.Err()
}
}

func stream(ctx context.Context, url string, data []byte) (io.ReadCloser, error) {
req, err := http.NewRequest("POST", url, bytes.NewBuffer(data))
if err != nil {
return nil, err
}
req.Header.Set("X-Custom-Header", "Akash")
req.Header.Set("Content-Type", "application/json")
req.WithContext(ctx)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != 200 {
return nil, errors.New("response not ok: " + resp.Status)
}
return resp.Body, nil
}
1 change: 1 addition & 0 deletions cmd/akash/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func main() {
root.AddCommand(query.QueryCommand())
root.AddCommand(statusCommand())
root.AddCommand(marketplaceCommand())
root.AddCommand(logsCommand())

if err := root.Execute(); err != nil {
os.Exit(1)
Expand Down
32 changes: 0 additions & 32 deletions grpc/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,3 @@ func Marshal(obj proto.Message) ([]byte, error) {
}
return buf.Bytes(), nil
}

// func VerifySignature(request *types.GRPCRequest) (crypto.Address, error) {
// buf := bytes.Buffer{}
// marshaler := jsonpb.Marshaler{}
// // switch v := request.Payload.(type) {
// // case *types.GRPCRequest_ManifestRequest:
// // if err := marshaler.Marshal(&buf, v.ManifestRequest); err != nil {
// // return nil, err
// // }
// // default:
// // return nil, types.ErrInvalidPayload{Message: "invalid payload"}
// // }

// if err := marshaler.Marshal(&buf, request.ManifestRequest); err != nil {
// return nil, err
// }

// key, err := crypto.PubKeyFromBytes(request.Key)
// if err != nil {
// return nil, err
// }

// sig, err := crypto.SignatureFromBytes(request.Signature)
// if err != nil {
// return nil, err
// }

// if !key.VerifyBytes(buf.Bytes(), sig) {
// return nil, types.ErrInvalidSignature{"invalud signature"}
// }
// return key.Address(), err
// }
45 changes: 36 additions & 9 deletions provider/cluster/client.go
Original file line number Diff line number Diff line change
@@ -1,43 +1,70 @@
package cluster

import (
"bufio"
"context"
"io"

"github.com/ovrclk/akash/types"
"k8s.io/api/apps/v1"
)

type Client interface {
Deploy(types.LeaseID, *types.ManifestGroup) error
Teardown(types.LeaseID) error
TeardownLease(types.LeaseID) error
TeardownNamespace(string) error

Deployments() ([]Deployment, error)
KubeDeployments(types.LeaseID) (*v1.DeploymentList, error)
KubeDeployment(types.LeaseID, string) (*v1.Deployment, error)
LeaseStatus(types.LeaseID) (*types.LeaseStatusResponse, error)
ServiceStatus(types.LeaseID, string) (*types.ServiceStatusResponse, error)
ServiceLogs(context.Context, types.LeaseID, int64, bool) ([]*ServiceLog, error)
}

type Deployment interface {
LeaseID() types.LeaseID
ManifestGroup() *types.ManifestGroup
}

func NullClient() Client {
return nullClient(0)
type ServiceLog struct {
Name string
Stream io.ReadCloser
Scanner *bufio.Scanner
}

type nullClient int

func NewServiceLog(name string, stream io.ReadCloser) *ServiceLog {
return &ServiceLog{
Name: name,
Stream: stream,
Scanner: bufio.NewScanner(stream),
}
}

func NullClient() Client {
return nullClient(0)
}

func (nullClient) Deploy(_ types.LeaseID, _ *types.ManifestGroup) error {
return nil
}

func (nullClient) KubeDeployments(_ types.LeaseID) (*v1.DeploymentList, error) {
func (nullClient) LeaseStatus(_ types.LeaseID) (*types.LeaseStatusResponse, error) {
return nil, nil
}

func (nullClient) ServiceStatus(_ types.LeaseID, _ string) (*types.ServiceStatusResponse, error) {
return nil, nil
}

func (nullClient) KubeDeployment(_ types.LeaseID, _ string) (*v1.Deployment, error) {
func (nullClient) ServiceLogs(_ context.Context, _ types.LeaseID, _ int64, _ bool) ([]*ServiceLog, error) {
return nil, nil
}

func (nullClient) Teardown(_ types.LeaseID) error {
func (nullClient) TeardownLease(_ types.LeaseID) error {
return nil
}

func (nullClient) TeardownNamespace(_ string) error {
return nil
}

Expand Down
Loading

0 comments on commit 112ceaa

Please sign in to comment.