Skip to content

Commit

Permalink
docker engine BuildFromExist (#207)
Browse files Browse the repository at this point in the history
* engine.PushImage returns message channel

* implement new PushImage for engines

* implement BuildFromExist for docker

* pass test

* bump version: 20.06.01

* engine.ImagePull return channel too
  • Loading branch information
zc authored Jun 8, 2020
1 parent bd905b7 commit 9137c57
Show file tree
Hide file tree
Showing 12 changed files with 119 additions and 50 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
20.05.01
20.06.01
23 changes: 8 additions & 15 deletions cluster/calcium/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,27 +140,20 @@ func (c *Calcium) pushImage(ctx context.Context, resp io.ReadCloser, node *types
for i := range tags {
tag := tags[i]
log.Infof("[BuildImage] Push image %s", tag)
rc, err := node.Engine.ImagePush(ctx, tag)
pushCh, err := node.Engine.ImagePush(ctx, tag)
if err != nil {
ch <- makeErrorBuildImageMessage(err)
continue
}
defer rc.Close()

decoder2 := json.NewDecoder(rc)
for {
message := &types.BuildImageMessage{}
err := decoder2.Decode(message)
if err != nil {
if err == io.EOF {
break
}
malformed := []byte{}
_, _ = decoder2.Buffered().Read(malformed)
log.Errorf("[BuildImage] Decode push image message failed %v, buffered: %v", err, malformed)
break
for m := range pushCh {
ch <- &types.BuildImageMessage{
ID: m.ID,
Status: m.Status,
Progress: m.Progress,
Error: m.Error,
Stream: m.Stream,
}
ch <- message
}

// 无论如何都删掉build机器的
Expand Down
16 changes: 13 additions & 3 deletions cluster/calcium/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/mock"

enginemocks "github.com/projecteru2/core/engine/mocks"
enginetypes "github.com/projecteru2/core/engine/types"
schedulermocks "github.com/projecteru2/core/scheduler/mocks"
storemocks "github.com/projecteru2/core/store/mocks"
"github.com/projecteru2/core/types"
Expand Down Expand Up @@ -101,10 +102,8 @@ func TestBuild(t *testing.T) {
buildImageMessage.ErrorDetail.Code = 0
buildImageResp, err := json.Marshal(buildImageMessage)
assert.NoError(t, err)
buildImageResp2, err := json.Marshal(buildImageMessage)
assert.NoError(t, err)
buildImageRespReader := ioutil.NopCloser(bytes.NewReader(buildImageResp))
buildImageRespReader2 := ioutil.NopCloser(bytes.NewReader(buildImageResp2))
engine.On("BuildRefs", mock.Anything, mock.Anything, mock.Anything).Return([]string{"t1", "t2"})
// failed by build context
engine.On("BuildContent", mock.Anything, mock.Anything, mock.Anything).Return("", nil, types.ErrBadCount).Once()
Expand All @@ -130,7 +129,18 @@ func TestBuild(t *testing.T) {
assert.Error(t, err)
// correct
engine.On("ImageBuild", mock.Anything, mock.Anything, mock.Anything).Return(buildImageRespReader, nil)
engine.On("ImagePush", mock.Anything, mock.Anything).Return(buildImageRespReader2, nil)
pushCh := make(chan *enginetypes.ImageMessage)
go func() {
defer close(pushCh)
pushCh <- &enginetypes.ImageMessage{
Progress: buildImageMessage.Progress,
Error: buildImageMessage.Error,
ID: buildImageMessage.ID,
Status: buildImageMessage.Status,
Stream: buildImageMessage.Stream,
}
}()
engine.On("ImagePush", mock.Anything, mock.Anything).Return(pushCh, nil)
engine.On("ImageRemove", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil)
engine.On("ImageBuildCachePrune", mock.Anything, mock.Anything).Return(uint64(1024), nil)
engine.On("BuildContent", mock.Anything, mock.Anything, mock.Anything).Return("", nil, nil)
Expand Down
13 changes: 6 additions & 7 deletions cluster/calcium/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"

"bufio"

Expand All @@ -26,12 +25,12 @@ type window struct {

// As the name says,
// blocks until the stream is empty, until we meet EOF
func ensureReaderClosed(stream io.ReadCloser) {
if stream == nil {
func ensureChanClosed(ch chan *enginetypes.ImageMessage) {
if ch == nil {
return
}
io.Copy(ioutil.Discard, stream)
stream.Close()
for range ch {
}
}

func execuateInside(ctx context.Context, client engine.API, ID, cmd, user string, env []string, privileged bool) ([]byte, error) {
Expand Down Expand Up @@ -109,12 +108,12 @@ func pullImage(ctx context.Context, node *types.Node, image string) error {
}

log.Info("[pullImage] Image not cached, pulling")
outStream, err := node.Engine.ImagePull(ctx, image, false)
messageCh, err := node.Engine.ImagePull(ctx, image, false)
if err != nil {
log.Errorf("[pullImage] Error during pulling image %s: %v", image, err)
return err
}
ensureReaderClosed(outStream)
ensureChanClosed(messageCh)
log.Infof("[pullImage] Done pulling image %s", image)
return nil
}
Expand Down
7 changes: 4 additions & 3 deletions cluster/calcium/image_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package calcium

import (
"bytes"
"context"
"io/ioutil"
"testing"

enginemocks "github.com/projecteru2/core/engine/mocks"
enginetypes "github.com/projecteru2/core/engine/types"
storemocks "github.com/projecteru2/core/store/mocks"
"github.com/projecteru2/core/types"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -85,7 +84,9 @@ func TestCacheImage(t *testing.T) {
}
engine.On("ImageRemoteDigest", mock.Anything, mock.Anything).Return("yy", nil)
engine.On("ImageLocalDigests", mock.Anything, mock.Anything).Return([]string{"xx"}, nil)
engine.On("ImagePull", mock.Anything, mock.Anything, mock.Anything).Return(ioutil.NopCloser(bytes.NewReader([]byte{})), nil)
imageCh := make(chan *enginetypes.ImageMessage)
close(imageCh)
engine.On("ImagePull", mock.Anything, mock.Anything, mock.Anything).Return(imageCh, nil)
// succ
ch, err = c.CacheImage(ctx, "", "", []string{"xx"}, 0)
for c := range ch {
Expand Down
27 changes: 27 additions & 0 deletions engine/docker/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,30 @@ func dumpFromString(ca, cert, key *os.File, caStr, certStr, keyStr string) error
log.Debug("[dumpFromString] Dump ca.pem, cert.pem, key.pem from string")
return nil
}

func parseDockerImageMessages(reader io.ReadCloser) chan *enginetypes.ImageMessage {
ch := make(chan *enginetypes.ImageMessage)
go func() {
defer close(ch)
defer reader.Close()

decoder := json.NewDecoder(reader)
for {
message := &enginetypes.ImageMessage{}
err := decoder.Decode(message)
if err != nil {
if err == io.EOF {
break
}
malformed := []byte{}
_, _ = decoder.Buffered().Read(malformed)
log.Errorf("[parseDockerImageMessages] Decode image message failed %v, buffered: %v", err, malformed)
message.Error = err.Error()
ch <- message
break
}
ch <- message
}
}()
return ch
}
43 changes: 37 additions & 6 deletions engine/docker/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (

dockertypes "github.com/docker/docker/api/types"
dockerfilters "github.com/docker/docker/api/types/filters"
log "github.com/sirupsen/logrus"

enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/types"
)

// ImageList list image
Expand Down Expand Up @@ -66,23 +66,25 @@ func (e *Engine) ImagesPrune(ctx context.Context) error {
}

// ImagePull pull Image
func (e *Engine) ImagePull(ctx context.Context, ref string, all bool) (io.ReadCloser, error) {
func (e *Engine) ImagePull(ctx context.Context, ref string, all bool) (chan *enginetypes.ImageMessage, error) {
auth, err := makeEncodedAuthConfigFromRemote(e.config.Docker.AuthConfigs, ref)
if err != nil {
return nil, err
}
pullOptions := dockertypes.ImagePullOptions{All: all, RegistryAuth: auth}
return e.client.ImagePull(ctx, ref, pullOptions)
reader, err := e.client.ImagePull(ctx, ref, pullOptions)
return parseDockerImageMessages(reader), err
}

// ImagePush push image
func (e *Engine) ImagePush(ctx context.Context, ref string) (io.ReadCloser, error) {
func (e *Engine) ImagePush(ctx context.Context, ref string) (chan *enginetypes.ImageMessage, error) {
auth, err := makeEncodedAuthConfigFromRemote(e.config.Docker.AuthConfigs, ref)
if err != nil {
return nil, err
}
pushOptions := dockertypes.ImagePushOptions{RegistryAuth: auth}
return e.client.ImagePush(ctx, ref, pushOptions)
reader, err := e.client.ImagePush(ctx, ref, pushOptions)
return parseDockerImageMessages(reader), err
}

// ImageBuild build image
Expand Down Expand Up @@ -119,7 +121,36 @@ func (e *Engine) ImageBuild(ctx context.Context, input io.Reader, refs []string)

// ImageBuildFromExist commits image from running container
func (e *Engine) ImageBuildFromExist(ctx context.Context, ID, name string) (imageID string, err error) {
return "", types.ErrEngineNotImplemented
opts := dockertypes.ContainerCommitOptions{
Reference: name,
Author: "eru-core",
}
resp, err := e.client.ContainerCommit(ctx, ID, opts)
if err != nil {
return "", err
}
defer func() {
if _, err := e.ImageRemove(context.Background(), resp.ID, true, true); err != nil {
log.Errorf("[ImageBuildFromExist] failed to remove built image %s: %+v", resp.ID, err)
return
}
if _, err := e.ImageBuildCachePrune(context.Background(), true); err != nil {
log.Errorf("[ImageBuildFromExist] failed to clean build cache: %+v", err)
}
}()

ch, err := e.ImagePush(ctx, name)
if err != nil {
return "", err
}

for m := range ch {
if m.Error != "" {
return "", fmt.Errorf("failed to push image %s: %s", name, m.Error)
}
}

return resp.ID, nil
}

// ImageBuildCachePrune prune build cache
Expand Down
4 changes: 2 additions & 2 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ type API interface {
ImageList(ctx context.Context, image string) ([]*enginetypes.Image, error)
ImageRemove(ctx context.Context, image string, force, prune bool) ([]string, error)
ImagesPrune(ctx context.Context) error
ImagePull(ctx context.Context, ref string, all bool) (io.ReadCloser, error)
ImagePush(ctx context.Context, ref string) (io.ReadCloser, error)
ImagePull(ctx context.Context, ref string, all bool) (chan *enginetypes.ImageMessage, error)
ImagePush(ctx context.Context, ref string) (chan *enginetypes.ImageMessage, error)
ImageBuild(ctx context.Context, input io.Reader, refs []string) (io.ReadCloser, error)
ImageBuildCachePrune(ctx context.Context, all bool) (uint64, error)
ImageLocalDigests(ctx context.Context, image string) ([]string, error)
Expand Down
16 changes: 8 additions & 8 deletions engine/mocks/API.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions engine/systemd/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ func (s *SSHClient) ImagesPrune(ctx context.Context) (err error) {
}

// ImagePull pulls image
func (s *SSHClient) ImagePull(ctx context.Context, ref string, all bool) (reader io.ReadCloser, err error) {
func (s *SSHClient) ImagePull(ctx context.Context, ref string, all bool) (ch chan *enginetypes.ImageMessage, err error) {
return
}

// ImagePush pushes image
func (s *SSHClient) ImagePush(ctx context.Context, ref string) (reader io.ReadCloser, err error) {
func (s *SSHClient) ImagePush(ctx context.Context, ref string) (ch chan *enginetypes.ImageMessage, err error) {
err = types.ErrEngineNotImplemented
return
}
Expand Down
8 changes: 8 additions & 0 deletions engine/types/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,11 @@ type Build struct {
Cache map[string]string `yaml:"cache,omitempty,flow"`
StopSignal string `yaml:"stop_signal,omitempty,flow"`
}

type ImageMessage struct {
Stream string `json:"stream,omitempty"`
Status string `json:"status,omitempty"`
Progress string `json:"progress,omitempty"`
ID string `json:"id,omitempty"`
Error string `json:"error,omitempty"`
}
6 changes: 3 additions & 3 deletions engine/virt/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ func (v *Virt) ImagesPrune(ctx context.Context) (err error) {
}

// ImagePull pulls an image to local virt-node.
func (v *Virt) ImagePull(ctx context.Context, ref string, all bool) (stream io.ReadCloser, err error) {
func (v *Virt) ImagePull(ctx context.Context, ref string, all bool) (ch chan *enginetypes.ImageMessage, err error) {
return
}

// ImagePush pushes to central image registry.
func (v *Virt) ImagePush(ctx context.Context, ref string) (rc io.ReadCloser, err error) {
func (v *Virt) ImagePush(ctx context.Context, ref string) (ch chan *enginetypes.ImageMessage, err error) {
log.Warnf("does not implement")
return
}
Expand All @@ -53,7 +53,7 @@ func (v *Virt) ImageBuildFromExist(ctx context.Context, ID, name string) (string
// TODO: removes below 2 lines
// upper layer may remove 'hub.docker.io/...../<name>' prefix and tag from the name.
// due to the domain and tag both are docker concepts.
// Removes domain part.
// Removes domain part.
name = filepath.Base(name)
// Removes tag (latest by default)
name = strings.Split(name, ":")[0]
Expand Down

0 comments on commit 9137c57

Please sign in to comment.