Skip to content

Commit

Permalink
support log stream options
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Mar 11, 2020
1 parent ef78612 commit 1278f37
Show file tree
Hide file tree
Showing 17 changed files with 427 additions and 315 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
20.03.01
20.03.02
4 changes: 3 additions & 1 deletion cluster/calcium/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"

"github.com/projecteru2/core/cluster"
enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -51,7 +52,8 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
}

var outStream io.ReadCloser
if outStream, err = container.Engine.VirtualizationLogs(ctx, message.ContainerID, true, true, true); err != nil {
if outStream, err = container.Engine.VirtualizationLogs(ctx, &enginetypes.VirtualizationLogStreamOptions{
ID: message.ContainerID, Follow: true, Stdout: true, Stderr: true}); err != nil {
log.Errorf("[RunAndWait] Can't fetch log of container %s error %v", message.ContainerID, err)
return
}
Expand Down
16 changes: 10 additions & 6 deletions cluster/calcium/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,34 @@ import (
"bufio"
"context"

enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/types"
)

// LogStream log stream for one container
func (c *Calcium) LogStream(ctx context.Context, ID string) (chan *types.LogStreamMessage, error) {
func (c *Calcium) LogStream(ctx context.Context, opts *types.LogStreamOptions) (chan *types.LogStreamMessage, error) {
ch := make(chan *types.LogStreamMessage)
go func() {
defer close(ch)
container, err := c.GetContainer(ctx, ID)
container, err := c.GetContainer(ctx, opts.ID)
if err != nil {
ch <- &types.LogStreamMessage{ID: ID, Error: err}
ch <- &types.LogStreamMessage{ID: opts.ID, Error: err}
return
}

resp, err := container.Engine.VirtualizationLogs(ctx, ID, true, true, true)
resp, err := container.Engine.VirtualizationLogs(ctx, &enginetypes.VirtualizationLogStreamOptions{
ID: opts.ID, Tail: opts.Tail, Since: opts.Since, Until: opts.Until,
Follow: true, Stdout:true, Stderr: true,
})
if err != nil {
ch <- &types.LogStreamMessage{ID: ID, Error: err}
ch <- &types.LogStreamMessage{ID: opts.ID, Error: err}
return
}

scanner := bufio.NewScanner(resp)
for scanner.Scan() {
data := scanner.Bytes()
ch <- &types.LogStreamMessage{ID: ID, Data: data}
ch <- &types.LogStreamMessage{ID: opts.ID, Data: data}
}
}()
return ch, nil
Expand Down
11 changes: 6 additions & 5 deletions cluster/calcium/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,28 @@ func TestLogStream(t *testing.T) {
Engine: engine,
}
ctx := context.Background()
opts := &types.LogStreamOptions{ID: ID}
// failed by GetContainer
store.On("GetContainer", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
ch, err := c.LogStream(ctx, ID)
ch, err := c.LogStream(ctx, opts)
assert.NoError(t, err)
for c := range ch {
assert.Equal(t, c.ID, ID)
assert.Empty(t, c.Data)
}
store.On("GetContainer", mock.Anything, mock.Anything).Return(container, nil)
// failed by VirtualizationLogs
engine.On("VirtualizationLogs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNodeExist).Once()
ch, err = c.LogStream(ctx, ID)
engine.On("VirtualizationLogs", mock.Anything, mock.Anything).Return(nil, types.ErrNodeExist).Once()
ch, err = c.LogStream(ctx, opts)
assert.NoError(t, err)
for c := range ch {
assert.Equal(t, c.ID, ID)
assert.Empty(t, c.Data)
}
reader := bytes.NewBufferString("aaaa\nbbbb\n")
engine.On("VirtualizationLogs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(ioutil.NopCloser(reader), nil)
engine.On("VirtualizationLogs", mock.Anything, mock.Anything).Return(ioutil.NopCloser(reader), nil)
// success
ch, err = c.LogStream(ctx, ID)
ch, err = c.LogStream(ctx, opts)
assert.NoError(t, err)
for c := range ch {
assert.Equal(t, c.ID, ID)
Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type Cluster interface {
DissociateContainer(ctx context.Context, IDs []string) (chan *types.DissociateContainerMessage, error)
ControlContainer(ctx context.Context, IDs []string, t string, force bool) (chan *types.ControlContainerMessage, error)
ReallocResource(ctx context.Context, IDs []string, cpu float64, memory int64, volumes types.VolumeBindings) (chan *types.ReallocResourceMessage, error)
LogStream(ctx context.Context, ID string) (chan *types.LogStreamMessage, error)
LogStream(ctx context.Context, opts *types.LogStreamOptions) (chan *types.LogStreamMessage, error)
RunAndWait(ctx context.Context, opts *types.DeployOptions, inCh <-chan []byte) (<-chan *types.AttachContainerMessage, error)
ExecuteContainer(ctx context.Context, opts *types.ExecuteContainerOptions, inCh <-chan []byte) chan *types.AttachContainerMessage
// finalizer
Expand Down
26 changes: 15 additions & 11 deletions cluster/mocks/Cluster.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions engine/docker/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,9 @@ func (e *Engine) VirtualizationInspect(ctx context.Context, ID string) (*enginet
}

// VirtualizationLogs show virtualization logs
func (e *Engine) VirtualizationLogs(ctx context.Context, ID string, follow, stdout, stderr bool) (io.ReadCloser, error) {
logsOpts := dockertypes.ContainerLogsOptions{Follow: follow, ShowStdout: stdout, ShowStderr: stderr}
resp, err := e.client.ContainerLogs(ctx, ID, logsOpts)
func (e *Engine) VirtualizationLogs(ctx context.Context, opts *enginetypes.VirtualizationLogStreamOptions) (io.ReadCloser, error) {
logsOpts := dockertypes.ContainerLogsOptions{Follow: opts.Follow, ShowStdout: opts.Stdout, ShowStderr: opts.Stderr, Tail: opts.Tail}
resp, err := e.client.ContainerLogs(ctx, opts.ID, logsOpts)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type API interface {
VirtualizationStop(ctx context.Context, ID string) error
VirtualizationRemove(ctx context.Context, ID string, volumes, force bool) error
VirtualizationInspect(ctx context.Context, ID string) (*enginetypes.VirtualizationInfo, error)
VirtualizationLogs(ctx context.Context, ID string, follow, stdout, stderr bool) (io.ReadCloser, error)
VirtualizationLogs(ctx context.Context, opts *enginetypes.VirtualizationLogStreamOptions) (io.ReadCloser, error)
VirtualizationAttach(ctx context.Context, ID string, stream, stdin bool) (io.ReadCloser, io.WriteCloser, error)
VirtualizationResize(ctx context.Context, ID string, height, width uint) error
VirtualizationWait(ctx context.Context, ID, state string) (*enginetypes.VirtualizationWaitResult, error)
Expand Down
29 changes: 17 additions & 12 deletions engine/mocks/API.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion engine/mocks/fakeengine/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func MakeClient(ctx context.Context, config coretypes.Config, nodename, endpoint
vcJSON := &enginetypes.VirtualizationInfo{ID: ID, Image: "mock-image", Running: true, Networks: map[string]string{"mock-network": "1.1.1.1"}}
e.On("VirtualizationInspect", mock.Anything, mock.Anything).Return(vcJSON, nil)
logs := ioutil.NopCloser(bytes.NewBufferString("logs1...\nlogs2...\n"))
e.On("VirtualizationLogs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(logs, nil)
e.On("VirtualizationLogs", mock.Anything, mock.Anything).Return(logs, nil)
attachData := ioutil.NopCloser(bytes.NewBufferString("logs1...\nlogs2...\n"))
bw := bufio.NewWriter(bytes.NewBuffer([]byte{}))
writeBuffer := &writeCloser{bw}
Expand Down
2 changes: 1 addition & 1 deletion engine/systemd/virtualization.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (s *SSHClient) VirtualizationInspect(ctx context.Context, ID string) (info
}

// VirtualizationLogs fetches service logs
func (s *SSHClient) VirtualizationLogs(ctx context.Context, ID string, follow, stdout, stderr bool) (reader io.ReadCloser, err error) {
func (s *SSHClient) VirtualizationLogs(ctx context.Context, opts *enginetypes.VirtualizationLogStreamOptions) (reader io.ReadCloser, err error) {
err = types.ErrEngineNotImplemented
return
}
Expand Down
12 changes: 12 additions & 0 deletions engine/types/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package types

// VirtualizationLogStreamOptions .
type VirtualizationLogStreamOptions struct {
ID string
Tail string
Since string
Until string
Follow bool
Stdout bool
Stderr bool
}
2 changes: 1 addition & 1 deletion engine/virt/virt.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (v *Virt) VirtualizationInspect(ctx context.Context, ID string) (*enginetyp
}

// VirtualizationLogs streams a specific guest's log.
func (v *Virt) VirtualizationLogs(ctx context.Context, ID string, follow, stdout, stderr bool) (io.ReadCloser, error) {
func (v *Virt) VirtualizationLogs(ctx context.Context, opts *enginetypes.VirtualizationLogStreamOptions) (reader io.ReadCloser, err error) {
return nil, fmt.Errorf("VirtualizationLogs does not implement")
}

Expand Down
Loading

0 comments on commit 1278f37

Please sign in to comment.