Skip to content
This repository has been archived by the owner on Sep 17, 2024. It is now read-only.

Update fetchBeatsBinary to be reused in elastic-agent-poc #1984

Merged
merged 10 commits into from
Jan 17, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 31 additions & 21 deletions internal/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
//nolint:unused
var seededRand = rand.New(rand.NewSource(time.Now().UnixNano()))

type DownloadRequest struct {
URL string
DownloadPath string
UnsanitizedFilePath string
}

// GetArchitecture retrieves if the underlying system platform is arm64 or amd64
func GetArchitecture() string {
arch, present := os.LookupEnv("GOARCH")
Expand All @@ -40,36 +46,40 @@ func GetArchitecture() string {
// DownloadFile will download a url and store it in a temporary path.
// It writes to the destination file as it downloads it, without
// loading the entire file into memory.
func DownloadFile(url string) (string, error) {
tempParentDir := filepath.Join(os.TempDir(), uuid.NewString())
internalio.MkdirAll(tempParentDir)
func DownloadFile(downloadRequest *DownloadRequest) error {
var filePath string
if downloadRequest.DownloadPath == "" {
tempParentDir := filepath.Join(os.TempDir(), uuid.NewString())
internalio.MkdirAll(tempParentDir)
filePath = filepath.Join(tempParentDir, uuid.NewString())
narph marked this conversation as resolved.
Show resolved Hide resolved
} else {
filePath = filepath.Join(downloadRequest.DownloadPath, uuid.NewString())
}

tempFile, err := os.Create(filepath.Join(tempParentDir, uuid.NewString()))
tempFile, err := os.Create(filePath)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"url": url,
"url": downloadRequest.URL,
}).Error("Error creating file")
return "", err
return err
}
defer tempFile.Close()

filepathFull := tempFile.Name()

downloadRequest.UnsanitizedFilePath = tempFile.Name()
exp := GetExponentialBackOff(3)

retryCount := 1
var fileReader io.ReadCloser

download := func() error {
resp, err := http.Get(url)
resp, err := http.Get(downloadRequest.URL)
if err != nil {
log.WithFields(log.Fields{
"elapsedTime": exp.GetElapsedTime(),
"error": err,
"path": filepathFull,
"path": downloadRequest.UnsanitizedFilePath,
"retry": retryCount,
"url": url,
"url": downloadRequest.URL,
}).Warn("Could not download the file")

retryCount++
Expand All @@ -80,8 +90,8 @@ func DownloadFile(url string) (string, error) {
log.WithFields(log.Fields{
"elapsedTime": exp.GetElapsedTime(),
"retries": retryCount,
"path": filepathFull,
"url": url,
"path": downloadRequest.UnsanitizedFilePath,
"url": downloadRequest.URL,
}).Trace("File downloaded")

fileReader = resp.Body
Expand All @@ -90,30 +100,30 @@ func DownloadFile(url string) (string, error) {
}

log.WithFields(log.Fields{
"url": url,
"path": filepathFull,
"url": downloadRequest.URL,
"path": downloadRequest.UnsanitizedFilePath,
}).Trace("Downloading file")

err = backoff.Retry(download, exp)
if err != nil {
return "", err
return err
}
defer fileReader.Close()

_, err = io.Copy(tempFile, fileReader)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"url": url,
"path": filepathFull,
"url": downloadRequest.URL,
"path": downloadRequest.UnsanitizedFilePath,
}).Error("Could not write file")

return filepathFull, err
return err
}

_ = os.Chmod(tempFile.Name(), 0666)

return filepathFull, nil
return nil
}

// IsCommit returns true if the string matches commit format
Expand Down
9 changes: 7 additions & 2 deletions internal/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@ import (
)

func TestDownloadFile(t *testing.T) {
f, err := DownloadFile("https://www.elastic.co/robots.txt")
var dRequest = DownloadRequest{
URL: "https://www.elastic.co/robots.txt",
DownloadPath: "",
}
err := DownloadFile(&dRequest)
assert.Nil(t, err)
defer os.Remove(filepath.Dir(f))
assert.NotEmpty(t, dRequest.UnsanitizedFilePath)
defer os.Remove(filepath.Dir(dRequest.UnsanitizedFilePath))
}

func TestGetArchitecture(t *testing.T) {
Expand Down
64 changes: 45 additions & 19 deletions pkg/downloads/versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func CheckPRVersion(version string, fallbackVersion string) string {
// FetchElasticArtifact fetches an artifact from the right repository, returning binary name, path and error
func FetchElasticArtifact(ctx context.Context, artifact string, version string, os string, arch string, extension string, isDocker bool, xpack bool) (string, string, error) {
binaryName := buildArtifactName(artifact, version, os, arch, extension, isDocker)
binaryPath, err := fetchBeatsBinary(ctx, binaryName, artifact, version, utils.TimeoutFactor, xpack)
binaryPath, err := FetchBeatsBinary(ctx, binaryName, artifact, version, utils.TimeoutFactor, xpack, "", false)
if err != nil {
log.WithFields(log.Fields{
"artifact": artifact,
Expand All @@ -109,7 +109,7 @@ func GetCommitVersion(version string) string {
// i.e. GetElasticArtifactURL("elastic-agent-$VERSION-$ARCH.deb", "elastic-agent", "$VERSION")
// i.e. GetElasticArtifactURL("elastic-agent-$VERSION-x86_64.rpm", "elastic-agent","$VERSION")
// i.e. GetElasticArtifactURL("elastic-agent-$VERSION-linux-$ARCH.tar.gz", "elastic-agent","$VERSION")
func GetElasticArtifactURL(artifactName string, artifact string, version string) (string, error) {
func GetElasticArtifactURL(artifactName string, artifact string, version string) (string, string, error) {
narph marked this conversation as resolved.
Show resolved Hide resolved
exp := utils.GetExponentialBackOff(time.Minute)

retryCount := 1
Expand Down Expand Up @@ -161,7 +161,7 @@ func GetElasticArtifactURL(artifactName string, artifact string, version string)

err := backoff.Retry(apiStatus, exp)
if err != nil {
return "", err
return "", "", err
}

jsonParsed, err := gabs.ParseJSON([]byte(body))
Expand All @@ -171,7 +171,7 @@ func GetElasticArtifactURL(artifactName string, artifact string, version string)
"artifactName": artifactName,
"version": tmpVersion,
}).Error("Could not parse the response body for the artifact")
return "", err
return "", "", err
}

log.WithFields(log.Fields{
Expand All @@ -191,8 +191,9 @@ func GetElasticArtifactURL(artifactName string, artifact string, version string)
// we need to get keys with dots using Search instead of Path
downloadObject := packagesObject.Search(artifactName)
downloadURL := downloadObject.Path("url").Data().(string)
downloadshaURL := downloadObject.Path("sha_url").Data().(string)

return downloadURL, nil
return downloadURL, downloadshaURL, nil
}

// GetElasticArtifactVersion returns the current version:
Expand Down Expand Up @@ -347,12 +348,12 @@ func buildArtifactName(artifact string, artifactVersion string, OS string, arch

}

// fetchBeatsBinary it downloads the binary and returns the location of the downloaded file
// FetchBeatsBinary it downloads the binary and returns the location of the downloaded file
// If the environment variable BEATS_LOCAL_PATH is set, then the artifact
// to be used will be defined by the local snapshot produced by the local build.
// Else, if the environment variable GITHUB_CHECK_SHA1 is set, then the artifact
// to be downloaded will be defined by the snapshot produced by the Beats CI for that commit.
func fetchBeatsBinary(ctx context.Context, artifactName string, artifact string, version string, timeoutFactor int, xpack bool) (string, error) {
func FetchBeatsBinary(ctx context.Context, artifactName string, artifact string, version string, timeoutFactor int, xpack bool, downloadPath string, downloadSHAFIle bool) (string, error) {
narph marked this conversation as resolved.
Show resolved Hide resolved
if BeatsLocalPath != "" {
span, _ := apm.StartSpanOptions(ctx, "Fetching Beats binary", "beats.local.fetch-binary", apm.SpanOptions{
Parent: apm.SpanFromContext(ctx).TraceContext(),
Expand All @@ -376,6 +377,11 @@ func fetchBeatsBinary(ctx context.Context, artifactName string, artifact string,
}

handleDownload := func(URL string) (string, error) {
name := artifactName
downloadRequest := utils.DownloadRequest{
DownloadPath: downloadPath,
URL: URL,
}
span, _ := apm.StartSpanOptions(ctx, "Fetching Beats binary", "beats.url.fetch-binary", apm.SpanOptions{
Parent: apm.SpanFromContext(ctx).TraceContext(),
})
Expand All @@ -389,28 +395,31 @@ func fetchBeatsBinary(ctx context.Context, artifactName string, artifact string,
return val, nil
}

filePathFull, err := utils.DownloadFile(URL)
err := utils.DownloadFile(&downloadRequest)
if err != nil {
return filePathFull, err
return downloadRequest.UnsanitizedFilePath, err
}

if strings.HasSuffix(URL, ".sha512") {
name = fmt.Sprintf("%s.sha512", name)
}
// use artifact name as file name to avoid having URL params in the name
sanitizedFilePath := filepath.Join(path.Dir(filePathFull), artifactName)
err = os.Rename(filePathFull, sanitizedFilePath)
sanitizedFilePath := filepath.Join(path.Dir(downloadRequest.UnsanitizedFilePath), name)
err = os.Rename(downloadRequest.UnsanitizedFilePath, sanitizedFilePath)
if err != nil {
log.WithFields(log.Fields{
"fileName": filePathFull,
"fileName": downloadRequest.UnsanitizedFilePath,
"sanitizedFileName": sanitizedFilePath,
}).Warn("Could not sanitize downloaded file name. Keeping old name")
sanitizedFilePath = filePathFull
sanitizedFilePath = downloadRequest.UnsanitizedFilePath
}

binariesCache[URL] = sanitizedFilePath

return sanitizedFilePath, nil
}

var downloadURL string
var downloadURL, downloadShaURL string
var err error

useCISnapshots := GithubCommitSha1 != ""
Expand All @@ -422,24 +431,41 @@ func fetchBeatsBinary(ctx context.Context, artifactName string, artifact string,

log.Debugf("Using CI snapshots for %s", artifact)

bucket, prefix, object := getGCPBucketCoordinates(artifactName, artifact)

maxTimeout := time.Duration(timeoutFactor) * time.Minute

bucket, prefix, object := getGCPBucketCoordinates(artifactName, artifact)

downloadURL, err = getObjectURLFromBucket(bucket, prefix, object, maxTimeout)
if err != nil {
return "", err
}
downloadLocation, err := handleDownload(downloadURL)

// check if sha file should be downloaded, else return
if downloadSHAFIle == false {
narph marked this conversation as resolved.
Show resolved Hide resolved
return downloadLocation, err
}

bucket, prefix, object = getGCPBucketCoordinates(fmt.Sprintf("%s.sha512", artifactName), artifact)
downloadURL, err = getObjectURLFromBucket(bucket, prefix, object, maxTimeout)
if err != nil {
return "", err
}
return handleDownload(downloadURL)
}

downloadURL, err = GetElasticArtifactURL(artifactName, artifact, version)
downloadURL, downloadShaURL, err = GetElasticArtifactURL(artifactName, artifact, version)
if err != nil {
return "", err
}

return handleDownload(downloadURL)
downloadLocation, err := handleDownload(downloadURL)
if err != nil {
return "", err
}
if downloadSHAFIle == true && downloadShaURL != "" {
narph marked this conversation as resolved.
Show resolved Hide resolved
downloadLocation, err = handleDownload(downloadShaURL)
}
return downloadLocation, err
}

func getBucketSearchNextPageParam(jsonParsed *gabs.Container) string {
Expand Down
Loading