Skip to content

Commit

Permalink
Changes wrt govet/golint suggestions; client code refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
vkuznet committed Feb 2, 2017
1 parent 86af10d commit 341c4ee
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 62 deletions.
145 changes: 94 additions & 51 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,32 @@ import (
"github.com/vkuznet/transfer2go/utils"
)

// Transfer client function is responsible to initiate transfer request from
// source to destination.
func Transfer(agent, src, dst string) error {
type UserRequest struct {
SrcUrl string
SrcAlias string
SrcFile string
DstUrl string
DstAlias string
DstFile string
Transfer bool
Upload bool
}

// String returns string representation of UserRequest struct
func (u *UserRequest) String() string {
var action string
if u.Transfer {
action = "transfer"
}
if u.Upload {
action = "upload"
}
return fmt.Sprintf("<UserRequest %s %s(%s) %s => %s(%s) %s>", action, u.SrcAlias, u.SrcUrl, u.SrcFile, u.DstAlias, u.DstUrl, u.DstFile)
}

// helper function to parse source and destination parameters
func parse(agent, src, dst string) (UserRequest, error) {
var req UserRequest
var transfer, upload bool
var srcFile, srcAlias, srcUrl, dstFile, dstAlias, dstUrl string
if stat, err := os.Stat(src); err == nil && !stat.IsDir() {
Expand Down Expand Up @@ -50,12 +73,12 @@ func Transfer(agent, src, dst string) error {
url := fmt.Sprintf("%s/agents", agent)
resp := utils.FetchResponse(url, []byte{})
if resp.Error != nil {
return resp.Error
return req, resp.Error
}
var remoteAgents map[string]string
e := json.Unmarshal(resp.Data, &remoteAgents)
if e != nil {
return e
return req, e
}

// get source agent alias name
Expand All @@ -81,59 +104,79 @@ func Transfer(agent, src, dst string) error {
if !ok {
log.Println("Unable to resolve destination", dst)
log.Println("Map of known agents", remoteAgents)
return fmt.Errorf("Unknown destination")
return req, fmt.Errorf("Unknown destination")
}
req = UserRequest{SrcUrl: srcUrl, SrcAlias: srcAlias, SrcFile: srcFile, DstUrl: dstUrl, DstAlias: dstAlias, DstFile: dstFile, Transfer: transfer, Upload: upload}
log.Println(req.String())
return req, nil
}

if transfer {
log.Println("### Transfer", srcAlias, srcUrl, srcFile, "=>", dstAlias, dstUrl, dstFile)
// helper function to perform user request transfer
func transfer(req UserRequest) error {

// Read data from source agent
url = fmt.Sprintf("%s/files?pattern=%s", srcUrl, srcFile)
resp = utils.FetchResponse(url, []byte{})
if resp.Error != nil {
return resp.Error
}
var files []string
err := json.Unmarshal(resp.Data, &files)
if err != nil {
return err
}
// Read data from source agent
url := fmt.Sprintf("%s/files?pattern=%s", req.SrcUrl, req.SrcFile)
resp := utils.FetchResponse(url, []byte{})
if resp.Error != nil {
return resp.Error
}
var files []string
err := json.Unmarshal(resp.Data, &files)
if err != nil {
return err
}

// form transfer request
var requests []model.TransferRequest
for _, fname := range files {
ts := time.Now().Unix()
requests = append(requests, model.TransferRequest{SrcUrl: srcUrl, SrcAlias: srcAlias, DstUrl: dstUrl, DstAlias: dst, File: fname, TimeStamp: ts})
}
// form transfer request
var requests []model.TransferRequest
for _, fname := range files {
ts := time.Now().Unix()
transferCollection := model.TransferCollection{TimeStamp: ts, Requests: requests}
requests = append(requests, model.TransferRequest{SrcUrl: req.SrcUrl, SrcAlias: req.SrcAlias, DstUrl: req.DstUrl, DstAlias: req.DstAlias, File: fname, TimeStamp: ts})
}
ts := time.Now().Unix()
transferCollection := model.TransferCollection{TimeStamp: ts, Requests: requests}

url = fmt.Sprintf("%s/request", srcUrl)
d, e := json.Marshal(transferCollection)
if e != nil {
return e
}
resp = utils.FetchResponse(url, d)
return resp.Error
url = fmt.Sprintf("%s/request", req.SrcUrl)
d, e := json.Marshal(transferCollection)
if e != nil {
return e
}
resp = utils.FetchResponse(url, d)
return resp.Error
}

if upload {
log.Println("### Upload", src, "to site", dstAlias, dstUrl, "as", dstFile)
data, err := ioutil.ReadFile(srcFile)
if err != nil {
return err
}
hash, bytes := utils.Hash(data)
d := "/a/b/c" // dataset name
b := "123" // block name
transferData := model.TransferData{File: dstFile, Dataset: d, Block: b, SrcUrl: srcUrl, SrcAlias: srcAlias, DstUrl: dstUrl, DstAlias: dstAlias, Data: data, Hash: hash, Bytes: bytes}
url = fmt.Sprintf("%s/transfer", dstUrl)
data, err = json.Marshal(transferData)
if err != nil {
return err
}
resp = utils.FetchResponse(url, data)
return resp.Error
// helper function to perform user request upload to the agent
func upload(req UserRequest) error {
data, err := ioutil.ReadFile(req.SrcFile)
if err != nil {
return err
}
hash, bytes := utils.Hash(data)
d := "/a/b/c" // dataset name
b := "123" // block name
transferData := model.TransferData{File: req.DstFile, Dataset: d, Block: b, SrcUrl: req.SrcUrl, SrcAlias: req.SrcAlias, DstUrl: req.DstUrl, DstAlias: req.DstAlias, Data: data, Hash: hash, Bytes: bytes}
url := fmt.Sprintf("%s/transfer", req.DstUrl)
data, err = json.Marshal(transferData)
if err != nil {
return err
}
resp := utils.FetchResponse(url, data)
return resp.Error
}

// Transfer client function is responsible to initiate transfer request from
// source to destination.
func Transfer(agent, src, dst string) error {
req, err := parse(agent, src, dst)
if err != nil {
return err
}

if req.Transfer {
return transfer(req)
}

if req.Upload {
return upload(req)
}

return fmt.Errorf("Unable to understand client request, src=%v to dst=%v", src, dst)
Expand All @@ -142,6 +185,6 @@ func Transfer(agent, src, dst string) error {
// Status function provides status about given agent
func Status(agent string) error {
resp := utils.FetchResponse(agent+"/status", []byte{})
log.Println("### Status", agent, string(resp.Data))
log.Println("Status", agent, string(resp.Data))
return resp.Error
}
9 changes: 4 additions & 5 deletions model/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
_ "github.com/mattn/go-sqlite3"
)

// global pointer to DB
// DB is global pointer to sql database object, it is initialized once when server starts
var DB *sql.DB

func check(msg string, err error) {
Expand Down Expand Up @@ -54,9 +54,8 @@ func (c *Catalog) Dump() []byte {
log.Println("ERROR c.Dump", err)
}
return out
} else {
log.Println("Catalog Dump method is not implemented yet for", c.Type)
}
log.Println("Catalog Dump method is not implemented yet for", c.Type)
return nil

}
Expand Down Expand Up @@ -156,7 +155,7 @@ func (c *Catalog) Files(pattern string) []string {
// fetch data from DB
rows, err := DB.Query(stm, pattern)
if err != nil {
log.Println("ERROR DB.Query, query='%s' error=%v", stm, err)
log.Printf("ERROR DB.Query, query='%s' error=%v\n", stm, err)
return files
}
defer rows.Close()
Expand Down Expand Up @@ -196,7 +195,7 @@ func (c *Catalog) FileInfo(fileEntry string) CatalogEntry {
// fetch data from DB
rows, err := DB.Query(stm, fileEntry)
if err != nil {
log.Println("ERROR DB.Query, query='%s' error=%v", stm, err)
log.Printf("ERROR DB.Query, query='%s' error=%v\n", stm, err)
return CatalogEntry{}
}
defer rows.Close()
Expand Down
5 changes: 1 addition & 4 deletions server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,10 @@ func RegisterProtocolHandler(w http.ResponseWriter, r *http.Request) {
_protocol = protocolParams.Protocol
_backend = protocolParams.Backend
_tool = protocolParams.Tool
log.Printf("INFO RegisterHandler switched to protocol=%s backend=%s tool=%s\n", _protocol, _backend, _tool)
log.Printf("INFO RegisterProtocolHandler switched to protocol=%s backend=%s tool=%s\n", _protocol, _backend, _tool)
w.WriteHeader(http.StatusOK)
return
} else {
log.Println("ERROR RegisterHandler", err)
}
w.WriteHeader(http.StatusInternalServerError)
}

// RequestHandler initiate transfer work for given request
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type AgentInfo struct {
Alias string
}

// ProtocolInfo data type
// AgentProtocol data type
type AgentProtocol struct {
Protocol string `json:"protocol"` // protocol name, e.g. srmv2
Backend string `json:"backend"` // backend storage end-point, e.g. srm://cms-srm.cern.ch:8443/srm/managerv2?SFN=
Expand Down
2 changes: 1 addition & 1 deletion utils/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func List2Set(arr []string) []string {
return out
}

// Provides a list of host IPs
// HostIP provides a list of host IPs
func HostIP() []string {
var out []string
addrs, err := net.InterfaceAddrs()
Expand Down

0 comments on commit 341c4ee

Please sign in to comment.