Skip to content

Commit

Permalink
Merge pull request #89 from Cerfoglg/Collectors-fixes
Browse files Browse the repository at this point in the history
Using JSON responses, reenabled mysqldump for sql dumps
  • Loading branch information
VincenzoFerme authored Aug 23, 2016
2 parents d22bd3e + b1c2423 commit 382ce3f
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 131 deletions.
153 changes: 79 additions & 74 deletions dbms/mysql/src/cloud/benchflow/collectors/mysql.go
Original file line number Diff line number Diff line change
@@ -1,90 +1,100 @@
package main

import (
"fmt"
"net/http"
"os"
"fmt"
"os/exec"
"github.com/benchflow/commons/minio"
"github.com/benchflow/commons/kafka"
"log"
"strings"
"encoding/json"
)

type Response struct {
Status string
Message string
}

func writeJSONResponse(w http.ResponseWriter, status string, message string) {
response := Response{status, message}
js, err := json.Marshal(response)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
fmt.Println(err)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(js)
}

func backupHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "PUT" {
w.WriteHeader(405)
return
}
// Generating key for Minio
//databaseMinioKey := minio.GenerateKey(os.Getenv("MYSQL_DB_NAME"))
databaseMinioKey := minio.GenerateKey(os.Getenv("MYSQL_DB_NAME"), os.Getenv("BENCHFLOW_TRIAL_ID"), os.Getenv("BENCHFLOW_EXPERIMENT_ID"), os.Getenv("BENCHFLOW_CONTAINER_NAME"), os.Getenv("BENCHFLOW_COLLECTOR_NAME"), os.Getenv("BENCHFLOW_DATA_NAME"))

log.Printf("Minio Key: " + databaseMinioKey)

// Save whole database as mysqldump
cmd := exec.Command("mysqldump", "-h", os.Getenv("MYSQL_HOST"), "-P", os.Getenv("MYSQL_PORT"), "-u", os.Getenv("MYSQL_USER"), "-p" + os.Getenv("MYSQL_USER_PASSWORD"), os.Getenv("MYSQL_DB_NAME"))
outfile, err := os.Create("/app/"+os.Getenv("MYSQL_DB_NAME")+"_backup.sql")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
fmt.Println(err)
return
}
cmd.Stdout = outfile
cmd.Start()
err = cmd.Wait()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
fmt.Println(err)
return
}
outfile.Close()

minio.GzipFile("/app/"+os.Getenv("MYSQL_DB_NAME")+"_backup.sql")
minio.SendGzipToMinio("/app/"+os.Getenv("MYSQL_DB_NAME")+"_backup.sql.gz", os.Getenv("MINIO_HOST"), os.Getenv("MINIO_PORT"), databaseMinioKey+"/"+os.Getenv("MYSQL_DB_NAME")+"_backup.sql", os.Getenv("MINIO_ACCESSKEYID"), os.Getenv("MINIO_SECRETACCESSKEY"))

err = os.Remove("/app/"+os.Getenv("MYSQL_DB_NAME")+"_backup.sql.gz")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
fmt.Println(err)
return
}

// Retrieve table names
ev := os.Getenv("TABLE_NAMES")
tables := strings.Split(ev, ",")

// cmdd := exec.Command("touch", "/app/backup.csv")
// cmdd.Run()
// cmdd.Wait()
// cmdd = exec.Command("chmod", "777", "/app/backup.csv")
// cmdd.Run()
// cmdd.Wait()
/*
for _,each := range tables {
cmd := exec.Command("mysqldump", "-h", os.Getenv("MYSQL_HOST"), "-P", os.Getenv("MYSQL_PORT"), "-u", os.Getenv("MYSQL_USER"), "-p" + os.Getenv("MYSQL_USER_PASSWORD"), os.Getenv("MYSQL_DB_NAME"), each)
outfile, err := os.Create("/app/"+each+"_backup.sql")
if err != nil {
fmt.Fprintf(w, "ERROR: %s", err)
panic(err)
}
cmd.Stdout = outfile
err = cmd.Start()
cmd.Wait()
if err != nil {
fmt.Fprintf(w, "ERROR: %s", err)
panic(err)
}
outfile.Close()
minio.GzipFile("/app/"+each+"_backup.sql")
callMinioClient("/app/"+each+"_backup.sql.gz", os.Getenv("MINIO_ALIAS"), databaseMinioKey+"/"+each+".sql.gz")
err = os.Remove("/app/"+each+"_backup.sql.gz")
if err != nil {
fmt.Fprintf(w, "ERROR: %s", err)
panic(err)
}
}
*/

// Save Table sizes
cmd := exec.Command("mysql", "-h", os.Getenv("MYSQL_HOST"), "-P", os.Getenv("MYSQL_PORT"), "-u", os.Getenv("MYSQL_USER"), "-p" + os.Getenv("MYSQL_USER_PASSWORD"), "-e", "USE "+os.Getenv("MYSQL_DB_NAME")+"; select table_schema AS Db, sum(data_length+index_length) AS Bytes from information_schema.tables where table_schema='"+os.Getenv("MYSQL_DB_NAME")+"' group by 1;")
cmd = exec.Command("mysql", "-h", os.Getenv("MYSQL_HOST"), "-P", os.Getenv("MYSQL_PORT"), "-u", os.Getenv("MYSQL_USER"), "-p" + os.Getenv("MYSQL_USER_PASSWORD"), "-e", "USE "+os.Getenv("MYSQL_DB_NAME")+"; select table_schema AS Db, sum(data_length+index_length) AS Bytes from information_schema.tables where table_schema='"+os.Getenv("MYSQL_DB_NAME")+"' group by 1;")
cmd2 := exec.Command("sed", "s/\\t/\",\"/g;s/^/\"/;s/$/\"/;s/\\n//g")
outfile, err := os.Create("/app/database_table_sizes_backup.csv")
// outfile, err := os.Open("/app/backup.csv")
outfile, err = os.Create("/app/database_table_sizes_backup.csv")
if err != nil {
fmt.Fprintf(w, "ERROR: %s", err)
panic(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
fmt.Println(err)
return
}
cmd2.Stdin, _ = cmd.StdoutPipe()
cmd2.Stdout = outfile
err = cmd2.Start()
cmd.Run()
cmd2.Wait()
if err != nil {
fmt.Fprintf(w, "ERROR: %s", err)
panic(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
fmt.Println(err)
return
}
outfile.Close()
minio.GzipFile("/app/database_table_sizes_backup.csv")
//callMinioClient("/app/database_table_sizes_backup.csv.gz", os.Getenv("MINIO_ALIAS"), databaseMinioKey+"/database_table_sizes.csv.gz")
minio.SendGzipToMinio("/app/database_table_sizes_backup.csv.gz", os.Getenv("MINIO_HOST"), os.Getenv("MINIO_PORT"), databaseMinioKey+"/database_table_sizes.csv.gz", os.Getenv("MINIO_ACCESSKEYID"), os.Getenv("MINIO_SECRETACCESSKEY"))
//minio.StoreOnMinio("backup.csv.gz", "runs", databaseMinioKey+each+".csv.gz")
err = os.Remove("/app/database_table_sizes_backup.csv.gz")
if err != nil {
fmt.Fprintf(w, "ERROR: %s", err)
panic(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
fmt.Println(err)
return
}


Expand All @@ -93,71 +103,66 @@ func backupHandler(w http.ResponseWriter, r *http.Request) {
cmd := exec.Command("mysql", "-h", os.Getenv("MYSQL_HOST"), "-P", os.Getenv("MYSQL_PORT"), "-u", os.Getenv("MYSQL_USER"), "-p" + os.Getenv("MYSQL_USER_PASSWORD"), "-e", "USE "+os.Getenv("MYSQL_DB_NAME")+"; SELECT * FROM "+each+";")
cmd2 := exec.Command("sed", "s/\\t/\",\"/g;s/^/\"/;s/$/\"/;s/\\n//g")
outfile, err := os.Create("/app/"+each+"_backup.csv")
// outfile, err := os.Open("/app/backup.csv")
if err != nil {
fmt.Fprintf(w, "ERROR: %s", err)
panic(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
fmt.Println(err)
return
}
cmd2.Stdin, _ = cmd.StdoutPipe()
cmd2.Stdout = outfile
err = cmd2.Start()
cmd.Run()
cmd2.Wait()
if err != nil {
fmt.Fprintf(w, "ERROR: %s", err)
panic(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
fmt.Println(err)
return
}
outfile.Close()
minio.GzipFile("/app/"+each+"_backup.csv")
//callMinioClient("/app/"+each+"_backup.csv.gz", os.Getenv("MINIO_ALIAS"), databaseMinioKey+"/"+each+".csv.gz")
minio.SendGzipToMinio("/app/"+each+"_backup.csv.gz", os.Getenv("MINIO_HOST"), os.Getenv("MINIO_PORT"), databaseMinioKey+"/"+each+".csv.gz", os.Getenv("MINIO_ACCESSKEYID"), os.Getenv("MINIO_SECRETACCESSKEY"))
//minio.StoreOnMinio("backup.csv.gz", "runs", databaseMinioKey+each+".csv.gz")
err = os.Remove("/app/"+each+"_backup.csv.gz")
if err != nil {
fmt.Fprintf(w, "ERROR: %s", err)
panic(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
fmt.Println(err)
return
}
}

// cmdd = exec.Command("touch", "/app/backup.csv")
// cmdd.Run()
// cmdd.Wait()
// cmdd = exec.Command("chmod", "777", "/app/backup.csv")
// cmdd.Run()
// cmdd.Wait()

// Save the column types of the tables
for _, each := range tables {
cmd := exec.Command("mysql", "-h", os.Getenv("MYSQL_HOST"), "-P", os.Getenv("MYSQL_PORT"), "-u", os.Getenv("MYSQL_USER"), "-p" + os.Getenv("MYSQL_USER_PASSWORD"), "-e", "USE "+os.Getenv("MYSQL_DB_NAME")+"; SHOW FIELDS FROM "+each+";")
cmd2 := exec.Command("sed", "s/\\t/\",\"/g;s/^/\"/;s/$/\"/;s/\\n//g")
outfile, err := os.Create("/app/"+each+"_backup_schema.csv")
// outfile, err := os.Open("/app/backup.csv")
if err != nil {
fmt.Fprintf(w, "ERROR: %s", err)
panic(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
fmt.Println(err)
return
}
cmd2.Stdin, _ = cmd.StdoutPipe()
cmd2.Stdout = outfile
err = cmd2.Start()
cmd.Run()
cmd2.Wait()
if err != nil {
fmt.Fprintf(w, "ERROR: %s", err)
panic(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
fmt.Println(err)
return
}
outfile.Close()
minio.GzipFile("/app/"+each+"_backup_schema.csv")
//callMinioClient("/app/"+each+"_backup_schema.csv.gz", os.Getenv("MINIO_ALIAS"), databaseMinioKey+"/"+each+"_schema.csv.gz")
minio.SendGzipToMinio("/app/"+each+"_backup_schema.csv.gz", os.Getenv("MINIO_HOST"), os.Getenv("MINIO_PORT"), databaseMinioKey+"/"+each+"_schema.csv.gz", os.Getenv("MINIO_ACCESSKEYID"), os.Getenv("MINIO_SECRETACCESSKEY"))
//minio.StoreOnMinio("backup.csv.gz", "runs", databaseMinioKey+each+"_schema.csv.gz")
err = os.Remove("/app/"+each+"_backup_schema.csv.gz")
if err != nil {
fmt.Fprintf(w, "ERROR: %s", err)
panic(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
fmt.Println(err)
return
}
}
kafka.SignalOnKafka(databaseMinioKey, os.Getenv("BENCHFLOW_TRIAL_ID"), os.Getenv("BENCHFLOW_EXPERIMENT_ID"), "mysql", "mysql", "host", os.Getenv("BENCHFLOW_COLLECTOR_NAME"), os.Getenv("KAFKA_HOST"), os.Getenv("KAFKA_PORT"), os.Getenv("KAFKA_TOPIC"))
fmt.Fprintf(w, "SUCCESS")

writeJSONResponse(w, "SUCCESS", "The collection was performed successfully for "+os.Getenv("BENCHFLOW_TRIAL_ID"))
fmt.Println("The collection was performed successfully for "+os.Getenv("BENCHFLOW_TRIAL_ID"))
}

func main() {
Expand Down
46 changes: 33 additions & 13 deletions environment/logs/src/cloud/benchflow/collectors/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,35 @@ package main

import (
"bufio"
//"fmt"
"fmt"
"github.com/fsouza/go-dockerclient"
"github.com/benchflow/commons/minio"
"github.com/benchflow/commons/kafka"
"log"
"net/http"
"encoding/json"
"os"
"strings"
"strconv"
)

type Response struct {
Status string
Message string
}

func writeJSONResponse(w http.ResponseWriter, status string, message string) {
response := Response{status, message}
js, err := json.Marshal(response)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
fmt.Println(err)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(js)
}

var client docker.Client

func storeData(w http.ResponseWriter, r *http.Request) {
Expand All @@ -36,7 +54,9 @@ func storeData(w http.ResponseWriter, r *http.Request) {
if since != "" {
sinceInt, err = strconv.ParseInt(since, 10, 64)
if err != nil {
panic(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
fmt.Println(err)
return
}
}

Expand All @@ -47,8 +67,10 @@ func storeData(w http.ResponseWriter, r *http.Request) {
for _, container := range conts {
inspect, err := client.InspectContainer(container)
if err != nil {
panic(err)
}
http.Error(w, err.Error(), http.StatusInternalServerError)
fmt.Println(err)
return
}

fo, _ := os.Create(inspect.Name + "_tmp")
writerOut := bufio.NewWriter(fo)
Expand All @@ -65,7 +87,9 @@ func storeData(w http.ResponseWriter, r *http.Request) {
Timestamps: true,
})
if err != nil {
log.Fatal(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
fmt.Println(err)
return
}

fo.Close()
Expand All @@ -74,7 +98,6 @@ func storeData(w http.ResponseWriter, r *http.Request) {
minio.GzipFile(inspect.Name+"_tmp")
minio.GzipFile(inspect.Name+"_tmp_err")

//minioKey := minio.GenerateKey("logs.gz")
minioKey := minio.GenerateKey(inspect.Name, os.Getenv("BENCHFLOW_TRIAL_ID"), os.Getenv("BENCHFLOW_EXPERIMENT_ID"), os.Getenv("BENCHFLOW_CONTAINER_NAME"), os.Getenv("BENCHFLOW_COLLECTOR_NAME"), os.Getenv("BENCHFLOW_DATA_NAME"))

composedMinioKey = composedMinioKey+minioKey+","
Expand All @@ -83,26 +106,23 @@ func storeData(w http.ResponseWriter, r *http.Request) {
composedContainerNames = composedContainerNames+cName+","

minio.SendGzipToMinio(inspect.Name+"_tmp.gz", os.Getenv("MINIO_HOST"), os.Getenv("MINIO_PORT"), minioKey+".gz", os.Getenv("MINIO_ACCESSKEYID"), os.Getenv("MINIO_SECRETACCESSKEY"))
//callMinioClient(container.ID+"_tmp.gz", os.Getenv("MINIO_ALIAS"), minioKey)
//minio.StoreOnMinio(container.ID+"_tmp.gz", "runs", minioKey)

//callMinioClient(container.ID+"_tmp_err.gz", os.Getenv("MINIO_ALIAS"), minioKey)
minio.SendGzipToMinio(inspect.Name+"_tmp_err.gz", os.Getenv("MINIO_HOST"), os.Getenv("MINIO_PORT"), minioKey+"_err.gz", os.Getenv("MINIO_ACCESSKEYID"), os.Getenv("MINIO_SECRETACCESSKEY"))
//minio.StoreOnMinio(container.ID+"_tmp_err.gz", "runs", minioKey)
//kafka.SignalOnKafka(minioKey)
}
composedMinioKey = strings.TrimRight(composedMinioKey, ",")
composedContainerIds = strings.TrimRight(composedContainerIds, ",")
composedContainerNames = strings.TrimRight(composedContainerNames, ",")
kafka.SignalOnKafka(composedMinioKey, os.Getenv("BENCHFLOW_TRIAL_ID"), os.Getenv("BENCHFLOW_EXPERIMENT_ID"), composedContainerIds, composedContainerNames, hostID, os.Getenv("BENCHFLOW_COLLECTOR_NAME"), os.Getenv("KAFKA_HOST"), os.Getenv("KAFKA_PORT"), os.Getenv("KAFKA_TOPIC"))

writeJSONResponse(w, "SUCCESS", "The collection was performed successfully for "+os.Getenv("BENCHFLOW_TRIAL_ID"))
fmt.Println("The collection was performed successfully for "+os.Getenv("BENCHFLOW_TRIAL_ID"))
}

func createDockerClient() docker.Client {
endpoint := "unix:///var/run/docker.sock"
client, err := docker.NewClient(endpoint)
if err != nil {
log.Fatal(err)
}
}
return *client
}

Expand Down
Loading

0 comments on commit 382ce3f

Please sign in to comment.