Skip to content

Commit

Permalink
add local exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
Sophie Zhao committed May 19, 2021
1 parent 96dda2b commit 8bf7747
Showing 1 changed file with 75 additions and 66 deletions.
141 changes: 75 additions & 66 deletions cmd/aks-periscope/aks-periscope.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"log"
"os"
"strings"
"sync"

Expand All @@ -14,83 +15,91 @@ import (

func main() {
zipAndExportMode := true
exporter := &exporter.AzureBlobExporter{}
exporters := []interfaces.Exporter{}
clusterType := os.Getenv("CLUSTER_TYPE")

exporters = append(exporters, &exporter.AzureBlobExporter{})
if strings.EqualFold(clusterType, "connectedCluster") {
exporters = append(exporters, &exporter.LocalMachineExporter{})
}

var waitgroup sync.WaitGroup

err := utils.CreateCRD()
if err != nil {
log.Printf("Failed to create CRD: %+v", err)
}
for _, exporter := range exporters {
collectors := []interfaces.Collector{}
containerLogsCollector := collector.NewContainerLogsCollector(exporter)
collectors = append(collectors, containerLogsCollector)
systemLogsCollector := collector.NewSystemLogsCollector(exporter)
collectors = append(collectors, systemLogsCollector)
networkOutboundCollector := collector.NewNetworkOutboundCollector(5, exporter)
collectors = append(collectors, networkOutboundCollector)
ipTablesCollector := collector.NewIPTablesCollector(exporter)
collectors = append(collectors, ipTablesCollector)
nodeLogsCollector := collector.NewNodeLogsCollector(exporter)
collectors = append(collectors, nodeLogsCollector)
dnsCollector := collector.NewDNSCollector(exporter)
collectors = append(collectors, dnsCollector)
kubeObjectsCollector := collector.NewKubeObjectsCollector(exporter)
collectors = append(collectors, kubeObjectsCollector)
kubeletCmdCollector := collector.NewKubeletCmdCollector(exporter)
collectors = append(collectors, kubeletCmdCollector)
systemPerfCollector := collector.NewSystemPerfCollector(exporter)
collectors = append(collectors, systemPerfCollector)

for _, c := range collectors {
waitgroup.Add(1)
go func(c interfaces.Collector) {
log.Printf("Collector: %s, collect data\n", c.GetName())
err := c.Collect()
if err != nil {
log.Printf("Collector: %s, collect data failed: %+v\n", c.GetName(), err)
}

log.Printf("Collector: %s, export data\n", c.GetName())
err = c.Export()
if err != nil {
log.Printf("Collector: %s, export data failed: %+v\n", c.GetName(), err)
}
waitgroup.Done()
}(c)
}

collectors := []interfaces.Collector{}
containerLogsCollector := collector.NewContainerLogsCollector(exporter)
collectors = append(collectors, containerLogsCollector)
systemLogsCollector := collector.NewSystemLogsCollector(exporter)
collectors = append(collectors, systemLogsCollector)
networkOutboundCollector := collector.NewNetworkOutboundCollector(5, exporter)
collectors = append(collectors, networkOutboundCollector)
ipTablesCollector := collector.NewIPTablesCollector(exporter)
collectors = append(collectors, ipTablesCollector)
nodeLogsCollector := collector.NewNodeLogsCollector(exporter)
collectors = append(collectors, nodeLogsCollector)
dnsCollector := collector.NewDNSCollector(exporter)
collectors = append(collectors, dnsCollector)
kubeObjectsCollector := collector.NewKubeObjectsCollector(exporter)
collectors = append(collectors, kubeObjectsCollector)
kubeletCmdCollector := collector.NewKubeletCmdCollector(exporter)
collectors = append(collectors, kubeletCmdCollector)
systemPerfCollector := collector.NewSystemPerfCollector(exporter)
collectors = append(collectors, systemPerfCollector)

for _, c := range collectors {
waitgroup.Add(1)
go func(c interfaces.Collector) {
log.Printf("Collector: %s, collect data\n", c.GetName())
err := c.Collect()
if err != nil {
log.Printf("Collector: %s, collect data failed: %+v\n", c.GetName(), err)
}

log.Printf("Collector: %s, export data\n", c.GetName())
err = c.Export()
if err != nil {
log.Printf("Collector: %s, export data failed: %+v\n", c.GetName(), err)
}
waitgroup.Done()
}(c)
}

waitgroup.Wait()
waitgroup.Wait()

diagnosers := []interfaces.Diagnoser{}
diagnosers = append(diagnosers, diagnoser.NewNetworkConfigDiagnoser(dnsCollector, kubeletCmdCollector, exporter))
diagnosers = append(diagnosers, diagnoser.NewNetworkOutboundDiagnoser(networkOutboundCollector, exporter))

for _, d := range diagnosers {
waitgroup.Add(1)
go func(d interfaces.Diagnoser) {
log.Printf("Diagnoser: %s, diagnose data\n", d.GetName())
err := d.Diagnose()
if err != nil {
log.Printf("Diagnoser: %s, diagnose data failed: %+v\n", d.GetName(), err)
}

log.Printf("Diagnoser: %s, export data\n", d.GetName())
err = d.Export()
if err != nil {
log.Printf("Diagnoser: %s, export data failed: %+v\n", d.GetName(), err)
}
waitgroup.Done()
}(d)
}

diagnosers := []interfaces.Diagnoser{}
diagnosers = append(diagnosers, diagnoser.NewNetworkConfigDiagnoser(dnsCollector, kubeletCmdCollector, exporter))
diagnosers = append(diagnosers, diagnoser.NewNetworkOutboundDiagnoser(networkOutboundCollector, exporter))
waitgroup.Wait()

for _, d := range diagnosers {
waitgroup.Add(1)
go func(d interfaces.Diagnoser) {
log.Printf("Diagnoser: %s, diagnose data\n", d.GetName())
err := d.Diagnose()
if zipAndExportMode {
log.Print("Zip and export result files")
err := zipAndExport(exporter)
if err != nil {
log.Printf("Diagnoser: %s, diagnose data failed: %+v\n", d.GetName(), err)
log.Printf("Failed to zip and export result files: %+v", err)
}

log.Printf("Diagnoser: %s, export data\n", d.GetName())
err = d.Export()
if err != nil {
log.Printf("Diagnoser: %s, export data failed: %+v\n", d.GetName(), err)
}
waitgroup.Done()
}(d)
}

waitgroup.Wait()

if zipAndExportMode {
log.Print("Zip and export result files")
err := zipAndExport(exporter)
if err != nil {
log.Printf("Failed to zip and export result files: %+v", err)
}
}

Expand Down

0 comments on commit 8bf7747

Please sign in to comment.