Skip to content

Commit

Permalink
Merge pull request #112 from gongweibao/filemanager2
Browse files Browse the repository at this point in the history
FileManager
  • Loading branch information
gongweibao authored Jun 7, 2017
2 parents e82c533 + 00880ae commit f647640
Show file tree
Hide file tree
Showing 22 changed files with 2,542 additions and 7 deletions.
4 changes: 4 additions & 0 deletions go/cmd/paddlecloud/paddlecloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ func main() {
subcommands.Register(&paddlecloud.LogsCommand{}, "")
subcommands.Register(&paddlecloud.GetCommand{}, "")
subcommands.Register(&paddlecloud.KillCommand{}, "")
subcommands.Register(&paddlecloud.LsCommand{}, "")
subcommands.Register(&paddlecloud.CpCommand{}, "")
subcommands.Register(&paddlecloud.RmCommand{}, "")
subcommands.Register(&paddlecloud.MkdirCommand{}, "")

flag.Parse()
ctx := context.Background()
Expand Down
24 changes: 24 additions & 0 deletions go/cmd/pfsserver/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package main

import (
"flag"
"fmt"
"net/http"

"github.com/PaddlePaddle/cloud/go/filemanager/pfsserver"
log "github.com/golang/glog"
)

func main() {
port := flag.Int("port", 8080, "port of server")
ip := flag.String("ip", "0.0.0.0", "ip of server")
tokenUri := flag.String("tokenuri", "http://cloud.paddlepaddle.org", "uri of token server")
flag.Parse()

router := pfsserver.NewRouter()
addr := fmt.Sprintf("%s:%d", *ip, *port)
pfsserver.TokenUri = *tokenUri

log.Infof("server on:%s and tokenuri:%s\n", addr, *tokenUri)
log.Fatal(http.ListenAndServe(addr, router))
}
98 changes: 98 additions & 0 deletions go/filemanager/pfsmodules/chunk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package pfsmodules

import (
"errors"
"fmt"
"io"
"net/url"
"os"
"strconv"

log "github.com/golang/glog"
)

// Chunk respresents a chunk info.
type Chunk struct {
Path string
Offset int64
Size int64
}

// ToURLParam encodes variables to url encoding parameters.
func (p *Chunk) ToURLParam() url.Values {
parameters := url.Values{}
parameters.Add("path", p.Path)

str := fmt.Sprint(p.Offset)
parameters.Add("offset", str)

str = fmt.Sprint(p.Size)
parameters.Add("chunksize", str)

return parameters
}

// ParseChunk get a Chunk struct from path.
// path example:
// path=/pfs/datacenter1/1.txt&offset=4096&chunksize=4096
func ParseChunk(path string) (*Chunk, error) {
cmd := Chunk{}

m, err := url.ParseQuery(path)
if err != nil ||
len(m["path"]) == 0 ||
len(m["offset"]) == 0 ||
len(m["chunksize"]) == 0 {
return nil, errors.New(StatusJSONErr)
}

cmd.Path = m["path"][0]
cmd.Offset, err = strconv.ParseInt(m["offset"][0], 10, 64)
if err != nil {
return nil, errors.New(StatusJSONErr)
}

chunkSize, err := strconv.ParseInt(m["chunksize"][0], 10, 64)
if err != nil {
return nil, errors.New(StatusBadChunkSize)
}
cmd.Size = chunkSize

return &cmd, nil
}

// LoadChunkData loads a specified chunk to io.Writer.
func (p *Chunk) LoadChunkData(w io.Writer) error {
f, err := os.Open(p.Path)
if err != nil {
return err
}
defer Close(f)

_, err = f.Seek(p.Offset, 0)
if err != nil {
return err
}

loaded, err := io.CopyN(w, f, p.Size)
log.V(2).Infof("loaded:%d\n", loaded)
return err
}

// SaveChunkData save data from io.Reader.
func (p *Chunk) SaveChunkData(r io.Reader) error {
f, err := os.OpenFile(p.Path, os.O_WRONLY, 0600)
if err != nil {
return err
}
defer Close(f)

_, err = f.Seek(p.Offset, 0)
if err != nil {
return err
}

writen, err := io.CopyN(f, r, p.Size)
log.V(2).Infof("chunksize:%d writen:%d\n", p.Size, writen)
return err
}
203 changes: 203 additions & 0 deletions go/filemanager/pfsmodules/chunkmeta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package pfsmodules

import (
"crypto/md5"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
"sort"
"strconv"
)

const (
defaultMaxChunkSize = 4 * 1024 * 1024
defaultMinChunkSize = 4 * 1024
)
const (
ChunkMetaCmdName = "GetChunkMeta"
)

// ChunkMeta holds the chunk meta's info.
type ChunkMeta struct {
Offset int64 `json:"offset"`
Checksum string `json:"checksum"`
Len int64 `json:"len"`
}

// ChunkMetaCmd is a command.
type ChunkMetaCmd struct {
Method string `json:"method"`
FilePath string `json:"path"`
ChunkSize int64 `json:"chunksize"`
}

// ToURLParam encodes ChunkMetaCmd to URL encoding string.
func (p *ChunkMetaCmd) ToURLParam() url.Values {
parameters := url.Values{}
parameters.Add("method", p.Method)
parameters.Add("path", p.FilePath)

str := fmt.Sprint(p.ChunkSize)
parameters.Add("chunksize", str)

return parameters
}

// ToJSON encodes ChunkMetaCmd to JSON string.
func (p *ChunkMetaCmd) ToJSON() ([]byte, error) {
return json.Marshal(p)
}

// Run is a functions which run ChunkMetaCmd.
func (p *ChunkMetaCmd) Run() (interface{}, error) {
return GetChunkMeta(p.FilePath, p.ChunkSize)
}

func (p *ChunkMetaCmd) checkChunkSize() error {
if p.ChunkSize < defaultMinChunkSize ||
p.ChunkSize > defaultMaxChunkSize {
return errors.New(StatusBadChunkSize)
}

return nil
}

// CloudCheck checks the conditions when running on cloud.
func (p *ChunkMetaCmd) ValidateCloudArgs(userName string) error {
if err := ValidatePfsPath([]string{p.FilePath}, userName); err != nil {
return err
}

return p.checkChunkSize()
}

// LocalCheck checks the conditions when running locally.
func (p *ChunkMetaCmd) ValidateLocalArgs() error {
return p.checkChunkSize()
}

// NewChunkMetaCmdFromURLParams get a new ChunkMetaCmd.
func NewChunkMetaCmdFromURLParam(r *http.Request) (*ChunkMetaCmd, error) {
method := r.URL.Query().Get("method")
path := r.URL.Query().Get("path")
chunkStr := r.URL.Query().Get("chunksize")

if len(method) == 0 ||
method != ChunkMetaCmdName ||
len(path) == 0 ||
len(chunkStr) == 0 {
return nil, errors.New(http.StatusText(http.StatusBadRequest))
}

chunkSize, err := strconv.ParseInt(chunkStr, 10, 64)
if err != nil {
return nil, errors.New(StatusBadChunkSize)
}

return &ChunkMetaCmd{
Method: method,
FilePath: path,
ChunkSize: chunkSize,
}, nil
}

type metaSlice []ChunkMeta

func (a metaSlice) Len() int { return len(a) }
func (a metaSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a metaSlice) Less(i, j int) bool { return a[i].Offset < a[j].Offset }

// GetDiffChunkMeta gets difference between srcMeta and dstMeta.
func GetDiffChunkMeta(srcMeta []ChunkMeta, dstMeta []ChunkMeta) ([]ChunkMeta, error) {
if len(dstMeta) == 0 || len(srcMeta) == 0 {
return srcMeta, nil
}

if !sort.IsSorted(metaSlice(srcMeta)) {
sort.Sort(metaSlice(srcMeta))
}

if !sort.IsSorted(metaSlice(dstMeta)) {
sort.Sort(metaSlice(dstMeta))
}

dstIdx := 0
srcIdx := 0
diff := make([]ChunkMeta, 0, len(srcMeta))

for {
if srcMeta[srcIdx].Offset < dstMeta[dstIdx].Offset {
diff = append(diff, srcMeta[srcIdx])
srcIdx++
} else if srcMeta[srcIdx].Offset > dstMeta[dstIdx].Offset {
dstIdx++
} else {
if srcMeta[srcIdx].Checksum != dstMeta[dstIdx].Checksum {
diff = append(diff, srcMeta[srcIdx])
}

dstIdx++
srcIdx++
}

if dstIdx >= len(dstMeta) {
break
}

if srcIdx >= len(srcMeta) {
break
}
}

if srcIdx < len(srcMeta) {
diff = append(diff, srcMeta[srcIdx:]...)
}

return diff, nil
}

// GetChunkMeta gets chunk metas from path of file.
func GetChunkMeta(path string, len int64) ([]ChunkMeta, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer Close(f)

if len > defaultMaxChunkSize || len < defaultMinChunkSize {
return nil, errors.New(StatusBadChunkSize)
}

var metas []ChunkMeta

data := make([]byte, len)
offset := int64(0)

for {
n, err := f.Read(data)
if err != nil && err != io.EOF {
return nil, err
}

if err == io.EOF {
break
}

m := ChunkMeta{}
m.Offset = offset
sum := md5.Sum(data[:n])
m.Checksum = hex.EncodeToString(sum[:])
m.Len = int64(n)

metas = append(metas, m)

offset += int64(n)
}

return metas, nil
}
Loading

0 comments on commit f647640

Please sign in to comment.