Skip to content

Commit

Permalink
Merge pull request #93 from Cerfoglg/code-comments
Browse files Browse the repository at this point in the history
Added comments to the code
  • Loading branch information
VincenzoFerme authored Nov 3, 2016
2 parents 382ce3f + 81adee2 commit 09e844d
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 7 deletions.
10 changes: 9 additions & 1 deletion dbms/mysql/src/cloud/benchflow/collectors/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import (
"encoding/json"
)

//Structure of the response message for the API requests
type Response struct {
Status string
Message string
}

//Function to marshall and write the response to the API requests
func writeJSONResponse(w http.ResponseWriter, status string, message string) {
response := Response{status, message}
js, err := json.Marshal(response)
Expand All @@ -28,7 +30,9 @@ func writeJSONResponse(w http.ResponseWriter, status string, message string) {
w.Write(js)
}

//Primary function that is called when a request is received from a client on /store
func backupHandler(w http.ResponseWriter, r *http.Request) {
//If request method is not PUT then respond with method not allowed
if r.Method != "PUT" {
w.WriteHeader(405)
return
Expand All @@ -54,9 +58,11 @@ func backupHandler(w http.ResponseWriter, r *http.Request) {
}
outfile.Close()

//Gzip and send to Minio
minio.GzipFile("/app/"+os.Getenv("MYSQL_DB_NAME")+"_backup.sql")
minio.SendGzipToMinio("/app/"+os.Getenv("MYSQL_DB_NAME")+"_backup.sql.gz", os.Getenv("MINIO_HOST"), os.Getenv("MINIO_PORT"), databaseMinioKey+"/"+os.Getenv("MYSQL_DB_NAME")+"_backup.sql", os.Getenv("MINIO_ACCESSKEYID"), os.Getenv("MINIO_SECRETACCESSKEY"))

//Remove temp file
err = os.Remove("/app/"+os.Getenv("MYSQL_DB_NAME")+"_backup.sql.gz")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -68,7 +74,7 @@ func backupHandler(w http.ResponseWriter, r *http.Request) {
ev := os.Getenv("TABLE_NAMES")
tables := strings.Split(ev, ",")

// Save Table sizes
// Save Table sizes, sending a mysql query and parsing it with sed into a csv file
cmd = exec.Command("mysql", "-h", os.Getenv("MYSQL_HOST"), "-P", os.Getenv("MYSQL_PORT"), "-u", os.Getenv("MYSQL_USER"), "-p" + os.Getenv("MYSQL_USER_PASSWORD"), "-e", "USE "+os.Getenv("MYSQL_DB_NAME")+"; select table_schema AS Db, sum(data_length+index_length) AS Bytes from information_schema.tables where table_schema='"+os.Getenv("MYSQL_DB_NAME")+"' group by 1;")
cmd2 := exec.Command("sed", "s/\\t/\",\"/g;s/^/\"/;s/$/\"/;s/\\n//g")
outfile, err = os.Create("/app/database_table_sizes_backup.csv")
Expand Down Expand Up @@ -159,8 +165,10 @@ func backupHandler(w http.ResponseWriter, r *http.Request) {
return
}
}
//Signal on Kafka
kafka.SignalOnKafka(databaseMinioKey, os.Getenv("BENCHFLOW_TRIAL_ID"), os.Getenv("BENCHFLOW_EXPERIMENT_ID"), "mysql", "mysql", "host", os.Getenv("BENCHFLOW_COLLECTOR_NAME"), os.Getenv("KAFKA_HOST"), os.Getenv("KAFKA_PORT"), os.Getenv("KAFKA_TOPIC"))

//Write response
writeJSONResponse(w, "SUCCESS", "The collection was performed successfully for "+os.Getenv("BENCHFLOW_TRIAL_ID"))
fmt.Println("The collection was performed successfully for "+os.Getenv("BENCHFLOW_TRIAL_ID"))
}
Expand Down
21 changes: 21 additions & 0 deletions environment/logs/src/cloud/benchflow/collectors/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ import (
"strconv"
)

//Structure of the response message for the API requests
type Response struct {
Status string
Message string
}

//Function to marshall and write the response to the API requests
func writeJSONResponse(w http.ResponseWriter, status string, message string) {
response := Response{status, message}
js, err := json.Marshal(response)
Expand All @@ -31,23 +33,29 @@ func writeJSONResponse(w http.ResponseWriter, status string, message string) {
w.Write(js)
}

//The variable containg the object representing the Docker client
var client docker.Client

//Primary function that is called when a request is received from a client on /store
func storeData(w http.ResponseWriter, r *http.Request) {
//If request method is not PUT then respond with method not allowed
if r.Method != "PUT" {
w.WriteHeader(405)
return
}
//Vars for the composed minio keys, container ids and names
composedMinioKey := ""
composedContainerIds := ""
composedContainerNames := ""

//Retrieve host id
info, err := client.Info()
if err != nil {
panic(err)
}
hostID := info.ID

//Retrieve and parse query "since" for the starting point of the logs, if available
since := r.FormValue("since")
var sinceInt int64
sinceInt = 0
Expand All @@ -60,18 +68,24 @@ func storeData(w http.ResponseWriter, r *http.Request) {
}
}

//Create Docker client
client = createDockerClient()

//Get container names/ids to collect from
contEV := os.Getenv("CONTAINERS")
conts := strings.Split(contEV, ",")

//Iterate over containers and collect data from each
for _, container := range conts {
//Inspect the container via the Docker client
inspect, err := client.InspectContainer(container)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
fmt.Println(err)
return
}

//Create temp files for logs, both stdout and stderr and write logs into them
fo, _ := os.Create(inspect.Name + "_tmp")
writerOut := bufio.NewWriter(fo)
fe, _ := os.Create(inspect.Name + "_tmp_err")
Expand All @@ -95,28 +109,35 @@ func storeData(w http.ResponseWriter, r *http.Request) {
fo.Close()
fe.Close()

//Gzip the files
minio.GzipFile(inspect.Name+"_tmp")
minio.GzipFile(inspect.Name+"_tmp_err")

//Generate the minio key
minioKey := minio.GenerateKey(inspect.Name, os.Getenv("BENCHFLOW_TRIAL_ID"), os.Getenv("BENCHFLOW_EXPERIMENT_ID"), os.Getenv("BENCHFLOW_CONTAINER_NAME"), os.Getenv("BENCHFLOW_COLLECTOR_NAME"), os.Getenv("BENCHFLOW_DATA_NAME"))

//Append minio keys and containers to the combined vars
composedMinioKey = composedMinioKey+minioKey+","
composedContainerIds = composedContainerIds+inspect.ID+","
cName := strings.Split(container, "_")[0]
composedContainerNames = composedContainerNames+cName+","

//Send files to Minio
minio.SendGzipToMinio(inspect.Name+"_tmp.gz", os.Getenv("MINIO_HOST"), os.Getenv("MINIO_PORT"), minioKey+".gz", os.Getenv("MINIO_ACCESSKEYID"), os.Getenv("MINIO_SECRETACCESSKEY"))
minio.SendGzipToMinio(inspect.Name+"_tmp_err.gz", os.Getenv("MINIO_HOST"), os.Getenv("MINIO_PORT"), minioKey+"_err.gz", os.Getenv("MINIO_ACCESSKEYID"), os.Getenv("MINIO_SECRETACCESSKEY"))
}
//Trim composed strings of the trailing coma, signal Kafka
composedMinioKey = strings.TrimRight(composedMinioKey, ",")
composedContainerIds = strings.TrimRight(composedContainerIds, ",")
composedContainerNames = strings.TrimRight(composedContainerNames, ",")
kafka.SignalOnKafka(composedMinioKey, os.Getenv("BENCHFLOW_TRIAL_ID"), os.Getenv("BENCHFLOW_EXPERIMENT_ID"), composedContainerIds, composedContainerNames, hostID, os.Getenv("BENCHFLOW_COLLECTOR_NAME"), os.Getenv("KAFKA_HOST"), os.Getenv("KAFKA_PORT"), os.Getenv("KAFKA_TOPIC"))

//Write response
writeJSONResponse(w, "SUCCESS", "The collection was performed successfully for "+os.Getenv("BENCHFLOW_TRIAL_ID"))
fmt.Println("The collection was performed successfully for "+os.Getenv("BENCHFLOW_TRIAL_ID"))
}

//Function to create the Docker client using the Docker socket (shared when the container was run)
func createDockerClient() docker.Client {
endpoint := "unix:///var/run/docker.sock"
client, err := docker.NewClient(endpoint)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import (
"encoding/json"
)

//Structure of the response message for the API requests
type Response struct {
Status string
Message string
}

//Function to marshall and write the response to the API requests
func writeJSONResponse(w http.ResponseWriter, status string, message string) {
response := Response{status, message}
js, err := json.Marshal(response)
Expand All @@ -29,6 +31,7 @@ func writeJSONResponse(w http.ResponseWriter, status string, message string) {
w.Write(js)
}

//Function to create the Docker client object to communicate with Docker, using the Docker socket (shared with the container)
func createDockerClient() docker.Client {
endpoint := "unix:///var/run/docker.sock"
client, err := docker.NewClient(endpoint)
Expand All @@ -38,36 +41,51 @@ func createDockerClient() docker.Client {
return *client
}

//Primary function that is called when a request is received from a client on /store
func storeData(w http.ResponseWriter, r *http.Request) {
//If request method is not PUT then respond with method not allowed
if r.Method != "PUT" {
w.WriteHeader(405)
return
}

//Create Docker client
client := createDockerClient()

//Get Docker info (host properties)
info, err := client.Info()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
fmt.Println(err)
return
}

//Get Docker version data
version, err := client.Version()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
fmt.Println(err)
return
}

//Get list of containers to collect from
contEV := os.Getenv("CONTAINERS")
conts := strings.Split(contEV, ",")

//Initiating variables for lists of keys, container ids and names
composedMinioKey := ""
composedContainerIds := ""
composedContainerNames := ""

//Host id
hostID := info.ID

//For all containers observed retrieve properties
for _, each := range conts {
//Var to store the JSONs obtained from the API
var e docker.Env

//Creating temp files
foInspect, err := os.Create("/app/"+each+"_inspect_tmp")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -87,6 +105,7 @@ func storeData(w http.ResponseWriter, r *http.Request) {
return
}

//Inspecting container (container properties) and writing the data in the files
inspect, err := client.InspectContainer(each)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -102,22 +121,26 @@ func storeData(w http.ResponseWriter, r *http.Request) {
e.SetJSON("version", version)
foVersion.Write([]byte(e.Get("version")))

//Gzipping files
minio.GzipFile("/app/"+each+"_inspect_tmp")
minio.GzipFile("/app/"+each+"_info_tmp")
minio.GzipFile("/app/"+each+"_version_tmp")

//Generating Minio key, adding it to the list of minio keys, same for container ids and names
minioKey := minio.GenerateKey(each, os.Getenv("BENCHFLOW_TRIAL_ID"), os.Getenv("BENCHFLOW_EXPERIMENT_ID"), os.Getenv("BENCHFLOW_CONTAINER_NAME"), os.Getenv("BENCHFLOW_COLLECTOR_NAME"), os.Getenv("BENCHFLOW_DATA_NAME"))
composedMinioKey = composedMinioKey+minioKey+","
composedContainerIds = composedContainerIds+inspect.ID+","
cName := strings.Split(each, "_")[0]
composedContainerNames = composedContainerIds+cName+","

//Sending files to Minio
minio.SendGzipToMinio("/app/"+each+"_inspect_tmp.gz", os.Getenv("MINIO_HOST"), os.Getenv("MINIO_PORT"), minioKey+"_inspect.gz", os.Getenv("MINIO_ACCESSKEYID"), os.Getenv("MINIO_SECRETACCESSKEY"))

minio.SendGzipToMinio("/app/"+each+"_info_tmp.gz", os.Getenv("MINIO_HOST"), os.Getenv("MINIO_PORT"), minioKey+"_info.gz", os.Getenv("MINIO_ACCESSKEYID"), os.Getenv("MINIO_SECRETACCESSKEY"))

minio.SendGzipToMinio("/app/"+each+"_version_tmp.gz", os.Getenv("MINIO_HOST"), os.Getenv("MINIO_PORT"), minioKey+"_version.gz", os.Getenv("MINIO_ACCESSKEYID"), os.Getenv("MINIO_SECRETACCESSKEY"))

//Removing temp files
err = os.Remove("/app/"+each+"_inspect_tmp.gz")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -137,12 +160,15 @@ func storeData(w http.ResponseWriter, r *http.Request) {
return
}
}
//Trimming trailing comas from vars
composedMinioKey = strings.TrimRight(composedMinioKey, ",")
composedContainerIds = strings.TrimRight(composedContainerIds, ",")
composedContainerNames = strings.TrimRight(composedContainerNames, ",")

//Signalling on Kafka
kafka.SignalOnKafka(composedMinioKey, os.Getenv("BENCHFLOW_TRIAL_ID"), os.Getenv("BENCHFLOW_EXPERIMENT_ID"), composedContainerIds, composedContainerNames, hostID, os.Getenv("BENCHFLOW_COLLECTOR_NAME"), os.Getenv("KAFKA_HOST"), os.Getenv("KAFKA_PORT"), os.Getenv("KAFKA_TOPIC"))

//Writing response
writeJSONResponse(w, "SUCCESS", "The collection was performed successfully for "+os.Getenv("BENCHFLOW_TRIAL_ID"))
fmt.Println("The collection was performed successfully for "+os.Getenv("BENCHFLOW_TRIAL_ID"))
}
Expand Down
Loading

0 comments on commit 09e844d

Please sign in to comment.