From 5d979ab3d21edba4113d3551c46ccf1fd8b1be23 Mon Sep 17 00:00:00 2001 From: YaoZengzeng Date: Thu, 29 Mar 2018 17:24:10 +0800 Subject: [PATCH] feature: error stream for cri stream server Signed-off-by: YaoZengzeng --- cri/stream/docs.go | 7 ++ cri/stream/remotecommand/attach.go | 11 ++- cri/stream/remotecommand/errors.go | 122 +++++++++++++++++++++++++ cri/stream/remotecommand/exec.go | 27 +++++- cri/stream/remotecommand/httpstream.go | 28 +++++- cri/stream/remotecommand/websocket.go | 9 +- cri/stream/server.go | 2 +- daemon/mgr/cri_stream.go | 12 +-- 8 files changed, 205 insertions(+), 13 deletions(-) create mode 100644 cri/stream/docs.go create mode 100644 cri/stream/remotecommand/errors.go diff --git a/cri/stream/docs.go b/cri/stream/docs.go new file mode 100644 index 000000000..70642d2dd --- /dev/null +++ b/cri/stream/docs.go @@ -0,0 +1,7 @@ +package stream + +// NOTE: the code in this package and its subpackage is almost copy from +// kubernete's official code base and make some modification to satisify +// our need. +// The reason why we do this is not to vendor so many packages which most +// code is useless and we want to keep pouch simple and clean :) diff --git a/cri/stream/remotecommand/attach.go b/cri/stream/remotecommand/attach.go index f50cfe74c..cfcee006d 100644 --- a/cri/stream/remotecommand/attach.go +++ b/cri/stream/remotecommand/attach.go @@ -1,6 +1,7 @@ package remotecommand import ( + "fmt" "net/http" "time" ) @@ -21,10 +22,18 @@ func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, co } defer ctx.conn.Close() - attacher.Attach(container, streamOpts, &Streams{ + err := attacher.Attach(container, streamOpts, &Streams{ StreamCh: make(chan struct{}, 1), StdinStream: ctx.stdinStream, StdoutStream: ctx.stdoutStream, StderrStream: ctx.stderrStream, }) + if err != nil { + err = fmt.Errorf("error attaching to container: %v", err) + ctx.writeStatus(NewInternalError(err)) + } else { + ctx.writeStatus(&StatusError{ErrStatus: Status{ + Status: StatusSuccess, + }}) + } } diff --git a/cri/stream/remotecommand/errors.go b/cri/stream/remotecommand/errors.go new file mode 100644 index 000000000..22e4b0289 --- /dev/null +++ b/cri/stream/remotecommand/errors.go @@ -0,0 +1,122 @@ +package remotecommand + +import ( + "fmt" + "net/http" +) + +// Values of Status.Status +const ( + StatusSuccess = "Success" + StatusFailure = "Failure" +) + +// StatusReason is an enumeration of possible failure causes. Each StatusReason +// must map to a single HTTP status code, but multiple reasons may map +// to the same HTTP status code. +// TODO: move to apiserver +type StatusReason string + +const ( + // NonZeroExitCodeReason indicates that the command executing failed with non zero exit code. + NonZeroExitCodeReason StatusReason = "NonZeroExitCode" + + // StatusReasonInternalError indicates that an internal error occurred, it is unexpected + // and the outcome of the call is unknown. + // Details (optional): + // "causes" - The original error + // Status code 500 + StatusReasonInternalError StatusReason = "InternalError" +) + +// CauseType is a machine readable value providing more detail about what +// occurred in a status response. An operation may have multiple causes for a +// status (whether Failure or Success). +type CauseType string + +const ( + // ExitCodeCauseType indicates that the status cause is the command's exit code is not zero. + ExitCodeCauseType CauseType = "ExitCode" +) + +// StatusCause provides more information about an api.Status failure, including +// cases when multiple errors are encountered. +type StatusCause struct { + // A machine-readable description of the cause of the error. If this value is + // empty there is no information available. + // +optional + Type CauseType `json:"reason,omitempty" protobuf:"bytes,1,opt,name=reason,casttype=CauseType"` + // A human-readable description of the cause of the error. This field may be + // presented as-is to a reader. + // +optional + Message string `json:"message,omitempty" protobuf:"bytes,2,opt,name=message"` +} + +// StatusDetails is a set of additional properties that MAY be set by the +// server to provide additional information about a response. The Reason +// field of a Status object defines what attributes will be set. Clients +// must ignore fields that do not match the defined type of each attribute, +// and should assume that any attribute may be empty, invalid, or under +// defined. +type StatusDetails struct { + Causes []StatusCause `json:"causes,omitempty" protobuf:"bytes,4,rep,name=causes"` +} + +// Status is a return value for calls that don't return other objects. +type Status struct { + // Status of the operation. + // One of: "Success" or "Failure". + // More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#spec-and-status + // +optional + Status string `json:"status,omitempty" protobuf:"bytes,2,opt,name=status"` + // A human-readable description of the status of this operation. + // +optional + Message string `json:"message,omitempty" protobuf:"bytes,3,opt,name=message"` + // A machine-readable description of why this operation is in the + // "Failure" status. If this value is empty there + // is no information available. A Reason clarifies an HTTP status + // code but does not override it. + // +optional + Reason StatusReason `json:"reason,omitempty" protobuf:"bytes,4,opt,name=reason,casttype=StatusReason"` + // Extended data associated with the reason. Each reason may define its + // own extended details. This field is optional and the data returned + // is not guaranteed to conform to any schema except that defined by + // the reason type. + // +optional + Details *StatusDetails `json:"details,omitempty" protobuf:"bytes,5,opt,name=details"` + // Suggested HTTP return code for this status, 0 if not set. + // +optional + Code int32 `json:"code,omitempty" protobuf:"varint,6,opt,name=code"` +} + +// StatusError is an error intended for consumption by a REST API server; it can also be +// reconstructed by clients from a REST response. Public to allow easy type switches. +type StatusError struct { + ErrStatus Status +} + +var _ error = &StatusError{} + +// Error implements the Error interface. +func (e *StatusError) Error() string { + return e.ErrStatus.Message +} + +// Status allows access to e's status without having to know the detailed workings +// of StatusError. +func (e *StatusError) Status() Status { + return e.ErrStatus +} + +// NewInternalError returns an error indicating the item is invalid and cannot be processed. +func NewInternalError(err error) *StatusError { + return &StatusError{Status{ + Status: StatusFailure, + Code: http.StatusInternalServerError, + Reason: StatusReasonInternalError, + Details: &StatusDetails{ + Causes: []StatusCause{{Message: err.Error()}}, + }, + Message: fmt.Sprintf("Internal error occurred: %v", err), + }} +} diff --git a/cri/stream/remotecommand/exec.go b/cri/stream/remotecommand/exec.go index 9a2ffa774..7d45c6adc 100644 --- a/cri/stream/remotecommand/exec.go +++ b/cri/stream/remotecommand/exec.go @@ -1,6 +1,7 @@ package remotecommand import ( + "fmt" "net/http" "time" ) @@ -8,7 +9,7 @@ import ( // Executor knows how to execute a command in a container of the pod. type Executor interface { // Exec executes a command in a container of the pod. - Exec(containerID string, cmd []string, streamOpts *Options, streams *Streams) error + Exec(containerID string, cmd []string, streamOpts *Options, streams *Streams) (uint32, error) } // ServeExec handles requests to execute a command in a container. After @@ -22,9 +23,31 @@ func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, cont } defer ctx.conn.Close() - executor.Exec(container, cmd, streamOpts, &Streams{ + exitCode, err := executor.Exec(container, cmd, streamOpts, &Streams{ StdinStream: ctx.stdinStream, StdoutStream: ctx.stdoutStream, StderrStream: ctx.stderrStream, }) + if err != nil { + err = fmt.Errorf("error executing command in container: %v", err) + ctx.writeStatus(NewInternalError(err)) + } else if exitCode != 0 { + ctx.writeStatus(&StatusError{ErrStatus: Status{ + Status: StatusFailure, + Reason: NonZeroExitCodeReason, + Details: &StatusDetails{ + Causes: []StatusCause{ + { + Type: ExitCodeCauseType, + Message: fmt.Sprintf("%d", exitCode), + }, + }, + }, + Message: fmt.Sprintf("command terminated with non-zero exit code"), + }}) + } else { + ctx.writeStatus(&StatusError{ErrStatus: Status{ + Status: StatusSuccess, + }}) + } } diff --git a/cri/stream/remotecommand/httpstream.go b/cri/stream/remotecommand/httpstream.go index ac6e4e343..9d47f42dc 100644 --- a/cri/stream/remotecommand/httpstream.go +++ b/cri/stream/remotecommand/httpstream.go @@ -1,6 +1,7 @@ package remotecommand import ( + "encoding/json" "fmt" "io" "net/http" @@ -41,6 +42,7 @@ type context struct { stdoutStream io.WriteCloser stderrStream io.WriteCloser resizeStream io.ReadCloser + writeStatus func(status *StatusError) error tty bool } @@ -165,6 +167,7 @@ WaitForStreams: streamType := stream.Headers().Get(constant.StreamType) switch streamType { case constant.StreamTypeError: + ctx.writeStatus = v1WriteStatusFunc(stream) go waitStreamReply(stream.replySent, replyChan, stop) case constant.StreamTypeStdin: ctx.stdinStream = stream @@ -209,7 +212,7 @@ WaitForStreams: streamType := stream.Headers().Get(constant.StreamType) switch streamType { case constant.StreamTypeError: - // ctx.writeStatus = v1WriteStatusFunc(stream) + ctx.writeStatus = v1WriteStatusFunc(stream) go waitStreamReply(stream.replySent, replyChan, stop) case constant.StreamTypeStdin: ctx.stdinStream = stream @@ -241,3 +244,26 @@ WaitForStreams: return ctx, nil } + +func v1WriteStatusFunc(stream io.Writer) func(status *StatusError) error { + return func(status *StatusError) error { + if status.Status().Status == StatusSuccess { + return nil + } + + _, err := stream.Write([]byte(status.Error())) + return err + } +} + +func v4WriteStatusFunc(stream io.Writer) func(status *StatusError) error { + return func(status *StatusError) error { + bs, err := json.Marshal(status.Status()) + if err != nil { + return err + } + + _, err = stream.Write(bs) + return err + } +} diff --git a/cri/stream/remotecommand/websocket.go b/cri/stream/remotecommand/websocket.go index e200daa90..edb62b6a8 100644 --- a/cri/stream/remotecommand/websocket.go +++ b/cri/stream/remotecommand/websocket.go @@ -78,7 +78,7 @@ func createWebSocketStreams(w http.ResponseWriter, req *http.Request, opts *Opti }, }) conn.SetIdleTimeout(idleTimeout) - _, streams, err := conn.Open(w, req) + negotiatedProtocol, streams, err := conn.Open(w, req) if err != nil { runtime.HandleError(fmt.Errorf("Unable to upgrade websocket connection: %v", err)) return nil, false @@ -104,7 +104,12 @@ func createWebSocketStreams(w http.ResponseWriter, req *http.Request, opts *Opti tty: opts.TTY, } - // TODO: handle write status function. + switch negotiatedProtocol { + case v4BinaryWebsocketProtocol, v4Base64WebsocketProtocol: + ctx.writeStatus = v4WriteStatusFunc(streams[errorChannel]) + default: + ctx.writeStatus = v1WriteStatusFunc(streams[errorChannel]) + } return ctx, true } diff --git a/cri/stream/server.go b/cri/stream/server.go index b5d3a8456..a82504299 100644 --- a/cri/stream/server.go +++ b/cri/stream/server.go @@ -53,7 +53,7 @@ type Server interface { // Runtime is the interface to execute the commands and provide the streams. type Runtime interface { // Exec executes the command in pod. - Exec(containerID string, cmd []string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) error + Exec(containerID string, cmd []string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) (uint32, error) // Attach attaches to pod. Attach(containerID string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) error diff --git a/daemon/mgr/cri_stream.go b/daemon/mgr/cri_stream.go index 94da9db7b..c6add2d77 100644 --- a/daemon/mgr/cri_stream.go +++ b/daemon/mgr/cri_stream.go @@ -32,7 +32,7 @@ func newStreamRuntime(ctrMgr ContainerMgr) stream.Runtime { } // Exec executes a command inside the container. -func (s *streamRuntime) Exec(containerID string, cmd []string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) error { +func (s *streamRuntime) Exec(containerID string, cmd []string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) (uint32, error) { createConfig := &apitypes.ExecCreateConfig{ Cmd: cmd, AttachStdin: streamOpts.Stdin, @@ -45,7 +45,7 @@ func (s *streamRuntime) Exec(containerID string, cmd []string, streamOpts *remot execid, err := s.containerMgr.CreateExec(ctx, containerID, createConfig) if err != nil { - return fmt.Errorf("failed to create exec for container %q: %v", containerID, err) + return 0, fmt.Errorf("failed to create exec for container %q: %v", containerID, err) } startConfig := &apitypes.ExecStartConfig{} @@ -55,18 +55,18 @@ func (s *streamRuntime) Exec(containerID string, cmd []string, streamOpts *remot err = s.containerMgr.StartExec(ctx, execid, startConfig, attachConfig) if err != nil { - return fmt.Errorf("failed to start exec for container %q: %v", containerID, err) + return 0, fmt.Errorf("failed to start exec for container %q: %v", containerID, err) } ei, err := s.containerMgr.InspectExec(ctx, execid) if err != nil { - return fmt.Errorf("failed to inspect exec for container %q: %v", containerID, err) + return 0, fmt.Errorf("failed to inspect exec for container %q: %v", containerID, err) } // Not return until exec finished. - <-ei.ExitCh + result := <-ei.ExitCh - return nil + return result.ExitCode(), nil } // Attach attaches to a running container.