From d927709f80fbef1ada6ce1f07d94ca674f646432 Mon Sep 17 00:00:00 2001 From: Forest Johnson Date: Wed, 2 Oct 2019 17:23:59 -0500 Subject: [PATCH] Manually patch in the metrics and pump-restarting logic. I ran make and it worked, thats as far as I tested. These are the env vars we used: METRICS_PROTOCOL=udp METRICS_HOST_PORT=172.17.0.1:8092 METRICS_DATABASE_NAME=aws METRIC_NAME=logspout_log_count_by_container LOG_METRICS=0 METRICS_FLUSH_INTERVAL=10s --- router/metrics.go | 377 ++++++++++++++++++++++++++++++++++++++++++++ router/pump.go | 94 ++++++++--- router/pump_test.go | 31 +++- 3 files changed, 475 insertions(+), 27 deletions(-) create mode 100644 router/metrics.go diff --git a/router/metrics.go b/router/metrics.go new file mode 100644 index 00000000..8732e474 --- /dev/null +++ b/router/metrics.go @@ -0,0 +1,377 @@ +package router + +import ( + "bytes" + "fmt" + "io" + "log" + "math" + "net" + "net/http" + "sort" + "strconv" + "strings" + "time" +) + +type MetricSample struct { + ContainerName string + PumpId string + LogCount int +} + +type DeadPumpAlert struct { + PumpId string + DeadFor time.Duration +} + +type InfluxDbPointModel struct { + MeasurementName string + Tags map[string]*string + Fields map[string]*string + Timestamp int64 +} + +const newlineByte = byte('\n') +const escapeByte = byte('\\') +const spaceByte = byte(' ') +const commaByte = byte(',') +const equalsByte = byte('=') + +const metricsChannelSize = 10000 +const deadLogStreamAlertChannelSize = 100 +const deadLogStreamThresholdFudgeFactor = float64(6) +const metricHistorySampleCount = 20 +const metricsChannelFlushIntervalString = "100ms" +const selfMetricBufferSizeBytes = 4096 + +var logMetrics = "0" +var metricsFlushIntervalString = "10s" +var metricsProtocol = "udp" + +// default docker bridge IP address (Ip address of host machine from containers perspective) and default UDP port for telegraf, 8092 +var metricsHostPort = "172.17.0.1:8092" + +var metricsDatabaseName = "aws" +var metricName = "logspout_log_count_by_container" + +var metricsChannelFlushInterval time.Duration +var metricsFlushInterval time.Duration + +var metricChannel chan MetricSample +var deadLogStreamAlertChannel chan DeadPumpAlert +var metricBuffer map[string]*MetricSample +var metricHistory map[string][]MetricSample +var lastMetricSent time.Time + +func init() { + metricsProtocol = getopt("METRICS_PROTOCOL", metricsProtocol) + metricsHostPort = getopt("METRICS_HOST_PORT", metricsHostPort) + metricsDatabaseName = getopt("METRICS_DATABASE_NAME", metricsDatabaseName) + metricName = getopt("METRIC_NAME", metricName) + logMetrics = getopt("LOG_METRICS", logMetrics) + metricsFlushIntervalString = getopt("METRICS_FLUSH_INTERVAL", metricsFlushIntervalString) + + var err error + metricsChannelFlushInterval, err = time.ParseDuration(metricsChannelFlushIntervalString) + if err != nil { + panic(err) + } + metricsFlushInterval, err = time.ParseDuration(metricsFlushIntervalString) + if err != nil { + panic(err) + } + metricChannel = make(chan MetricSample, metricsChannelSize) + deadLogStreamAlertChannel = make(chan DeadPumpAlert, deadLogStreamAlertChannelSize) + lastMetricSent = time.Now() + + metricBuffer = make(map[string]*MetricSample) + metricHistory = make(map[string][]MetricSample) + + metricsChannelFlushInterval, err = time.ParseDuration(metricsChannelFlushIntervalString) + if err != nil { + panic(err) + } + + if metricsProtocol != "udp" && metricsProtocol != "http" && metricsProtocol != "https" { + panic(fmt.Errorf("Unsupported METRICS_PROTOCOL: %s. Supported protocols: http, https, udp", metricsProtocol)) + } + + go aggregateAndSendMetrics() +} + +func checkMetricHistoryForDeadLogStreams() { + //bytes, _ := json.MarshalIndent(metricHistory, "", " ") + //fmt.Printf("checkMetricHistoryForDeadLogStreams %s\n", string(bytes)) + for tagValuesCSV, logStreamHistory := range metricHistory { + + //fmt.Printf("tagValuesCSV: %s, checkMetricHistoryForDeadLogStreams %d >= %d ? \n", tagValuesCSV, len(logStreamHistory), metricHistorySampleCount) + if len(logStreamHistory) < metricHistorySampleCount { + continue + } + + numberOfSamplesInARowThatHadZeroLogs := len(logStreamHistory) + for i := 0; i < len(logStreamHistory); i++ { + + if logStreamHistory[i].LogCount != 0 { + numberOfSamplesInARowThatHadZeroLogs = i + break + } + } + //fmt.Printf("tagValuesCSV: %s, numberOfSamplesInARowThatHadZeroLogs: %d\n", tagValuesCSV, numberOfSamplesInARowThatHadZeroLogs) + + // So there was at least one sample where there were zero logs. + // is that / are those zero(s) an anomaly? Or is it normal? + // Well, how many standard deviations away from the mean is zero? + // we compare how many standard deviations away it is with the number of samples in a row that were zero, adjusted with a fudge factor. + // So lets say zero is two standard deviations away from the mean and we only saw one sample in a row with zero logs. + // then we would say 2 std devs > fudgefactor(1.5) / # of zero-logs-samples-in-a-row(1) . That would evaluate to true, meaning, the log stream has stopped. + // So lets say zero is one standard deviations away from the mean and we only saw one sample in a row with zero logs. + // then we would say 1 std devs > fudgefactor(1.5) / # of zero-logs-samples-in-a-row(1) . That would evaluate to false, meaning this is normal. + // but if number of zero-logs-samples-in-a-row were to increase to 2, then it would evaluate to true, indicating that the stream has stopped. + + if numberOfSamplesInARowThatHadZeroLogs > 0 { + previousSamplesLogCountSum := 0 + previousSamplesCount := 0 + for i := numberOfSamplesInARowThatHadZeroLogs; i < len(logStreamHistory); i++ { + previousSamplesLogCountSum += logStreamHistory[i].LogCount + previousSamplesCount++ + } + previousSamplesAverageLogCount := (float64(previousSamplesLogCountSum) / float64(previousSamplesCount)) + sumOfSquaresOfDeviationFromMean := float64(0) + for i := numberOfSamplesInARowThatHadZeroLogs; i < len(logStreamHistory); i++ { + deviation := float64(logStreamHistory[i].LogCount) - previousSamplesAverageLogCount + sumOfSquaresOfDeviationFromMean += (deviation * deviation) + } + standardDeviation := math.Sqrt(sumOfSquaresOfDeviationFromMean / float64(previousSamplesCount)) + + distanceFromAverageToZeroInStandardDeviations := previousSamplesAverageLogCount / standardDeviation + logStreamIsDead := distanceFromAverageToZeroInStandardDeviations > deadLogStreamThresholdFudgeFactor/float64(numberOfSamplesInARowThatHadZeroLogs) + // numberOfSamplesInARowThatHadZeroLogs: 1, distanceFromAverageToZeroInStandardDeviations: 0.55, logStreamIsDead: fals + + // debugLog(fmt.Sprintf( + // "tagValuesCSV: %s, numberOfSamplesInARowThatHadZeroLogs: %d, distanceFromAverageToZeroInStandardDeviations: %.2f, logStreamIsDead: %t\n", + // tagValuesCSV, + // numberOfSamplesInARowThatHadZeroLogs, + // distanceFromAverageToZeroInStandardDeviations, + // logStreamIsDead, + // )) + if logStreamIsDead { + tagValuesSlice := strings.Split(tagValuesCSV, ",") + deadLogStreamAlertChannel <- DeadPumpAlert{ + PumpId: tagValuesSlice[0], + DeadFor: time.Duration(numberOfSamplesInARowThatHadZeroLogs) * metricsFlushInterval, + } + } + } + + } +} + +func aggregateAndSendMetrics() { + + if metricChannel != nil { + done := false + for !done { + select { + case s := <-metricChannel: + tagsString := fmt.Sprintf( + "%s,%s", + s.PumpId, + s.ContainerName, + ) + if metricBuffer[tagsString] == nil { + s.LogCount = 1 + metricBuffer[tagsString] = &s + } else { + b := metricBuffer[tagsString] + b.LogCount++ + } + default: + // break does not work here :\ + done = true + } + } + } else { + log.Println("Error: metricChannel is nil!") + } + + if time.Since(lastMetricSent) > metricsFlushInterval { + + buffer := bytes.NewBuffer(make([]byte, 0, selfMetricBufferSizeBytes)) + i := 0 + + for tagsString, sample := range metricBuffer { + _, has := metricHistory[tagsString] + if !has { + metricHistory[tagsString] = make([]MetricSample, 0, metricHistorySampleCount) + } + + Count := fmt.Sprintf("%d", sample.LogCount) + metricPoint := InfluxDbPointModel{ + MeasurementName: metricName, + Fields: map[string]*string{"Count": &Count}, + Tags: map[string]*string{ + "ContainerName": &sample.ContainerName, + "PumpId": &sample.PumpId, + }, + Timestamp: time.Now().UnixNano(), + } + + writePoint(buffer, &metricPoint, i < len(metricBuffer)-1) + i++ + } + + newMetricHistory := make(map[string][]MetricSample) + + for tagsString, historyForTheseTags := range metricHistory { + sample, hasSample := metricBuffer[tagsString] + if !hasSample { + tagValues := strings.Split(tagsString, ",") + sample = &MetricSample{ + PumpId: tagValues[0], + ContainerName: tagValues[1], + LogCount: 0, + } + } + + newHistoryLength := intmin(metricHistorySampleCount, len(historyForTheseTags)+1) + newHistory := make([]MetricSample, newHistoryLength) + newHistory[0] = *sample + for i = 1; i < newHistoryLength; i++ { + newHistory[i] = historyForTheseTags[i-1] + } + newMetricHistory[tagsString] = newHistory + } + + metricHistory = newMetricHistory + + checkMetricHistoryForDeadLogStreams() + + metricBuffer = make(map[string]*MetricSample) + + if logMetrics == "1" { + log.Println(string(buffer.Bytes())) + } + + if metricsProtocol == "http" || metricsProtocol == "https" { + go func(buffer *bytes.Buffer) { + response, err := http.Post(fmt.Sprintf("%s://%s/write?db=%s", metricsProtocol, metricsHostPort, metricsDatabaseName), "text/plain", buffer) + if err != nil { + log.Printf("Error attempting to report self metrics: %+v\n", err) + } else if response.StatusCode < 200 || response.StatusCode >= 300 { + responseBodyString := "" + if response.Body != nil { + responseBody := bytes.NewBuffer(make([]byte, 0)) + _, err = io.Copy(responseBody, response.Body) + if err == nil { + responseBodyString = string(responseBody.Bytes()) + } + } + log.Printf("HTTP %d (%s) attempting to report self metrics: %s", response.StatusCode, response.Status, responseBodyString) + } + }(buffer) + } else if metricsProtocol == "udp" { + go func(buffer *bytes.Buffer) { + connection, err := net.Dial("udp", metricsHostPort) + if err != nil { + log.Printf("Error attempting to net.Dial(\"udp\", metricConfig.HostPort) to report self metrics: %+v\n", err) + return + } + + defer connection.Close() + + _, err = connection.Write(buffer.Bytes()) + if err != nil { + log.Printf("Error attempting to connection.Write to report self metrics: %+v\n", err) + return + } + }(buffer) + } else { + log.Printf("Can't send metrics because: Unsupported metric protocol: %s. Supported protocols: http, https, udp", metricsProtocol) + } + + lastMetricSent = time.Now() + } + + time.AfterFunc(metricsChannelFlushInterval, aggregateAndSendMetrics) +} + +func writePoint(writer *bytes.Buffer, point *InfluxDbPointModel, hasMore bool) { + writer.Write([]byte(point.MeasurementName)) + tagsCount := len(point.Tags) + tagIndex := 0 + if tagsCount > 0 { + writer.WriteByte(commaByte) + } + tagKeys := make([][]byte, 0, len(point.Tags)) + for k := range point.Tags { + tagKeys = append(tagKeys, []byte(k)) + } + sortedTagKeys := SortByteSlices(tagKeys) + for _, k := range sortedTagKeys { + writer.Write(k) + writer.WriteByte(equalsByte) + writer.Write([]byte(*point.Tags[string(k)])) + tagIndex++ + if tagIndex < tagsCount { + writer.WriteByte(commaByte) + } + } + + writer.WriteByte(spaceByte) + fieldsCount := len(point.Fields) + fieldIndex := 0 + for k, v := range point.Fields { + writer.Write([]byte(k)) + writer.WriteByte(equalsByte) + writer.Write([]byte(*v)) + fieldIndex++ + if fieldIndex < fieldsCount { + writer.WriteByte(commaByte) + } + } + writer.WriteByte(spaceByte) + writer.Write([]byte(strconv.FormatInt(point.Timestamp, 10))) + if hasMore { + writer.WriteByte(newlineByte) + } +} + +func SortByteSlices(src [][]byte) [][]byte { + sorted := sortByteArrays(src) + sort.Sort(sorted) + return sorted +} + +// implement `Interface` in sort package. +type sortByteArrays [][]byte + +func (b sortByteArrays) Len() int { + return len(b) +} + +func (b sortByteArrays) Less(i, j int) bool { + // bytes package already implements Comparable for []byte. + switch bytes.Compare(b[i], b[j]) { + case -1: + return true + case 0, 1: + return false + default: + log.Panic("not fail-able with `bytes.Comparable` bounded [-1, 1].") + return false + } +} + +func (b sortByteArrays) Swap(i, j int) { + b[j], b[i] = b[i], b[j] +} + +func intmin(a, b int) int { + if a < b { + return a + } else { + return b + } +} diff --git a/router/pump.go b/router/pump.go index 94e34786..5b8f47f1 100644 --- a/router/pump.go +++ b/router/pump.go @@ -10,15 +10,17 @@ import ( "sync" "time" - "github.com/fsouza/go-dockerclient" + docker "github.com/fsouza/go-dockerclient" ) var allowTTY bool func init() { pump := &LogsPump{ - pumps: make(map[string]*containerPump), - routes: make(map[chan *update]struct{}), + pumps: make(map[string]*containerPump), + routes: make(map[chan *update]struct{}), + reportMetric: func(sample MetricSample) { metricChannel <- sample }, + deadPumpAlerts: deadLogStreamAlertChannel, } setAllowTTY() LogRouters.Register(pump, "pump") @@ -110,7 +112,7 @@ func ignoreContainerTTY(container *docker.Container) bool { } func getInactivityTimeoutFromEnv() time.Duration { - inactivityTimeout, err := time.ParseDuration(getopt("INACTIVITY_TIMEOUT", "0")) + inactivityTimeout, err := time.ParseDuration(getopt("INACTIVITY_TIMEOUT", "10s")) assert(err, "Couldn't parse env var INACTIVITY_TIMEOUT. See https://golang.org/pkg/time/#ParseDuration for valid format.") return inactivityTimeout } @@ -122,10 +124,12 @@ type update struct { // LogsPump is responsible for "pumping" logs to their configured destinations type LogsPump struct { - mu sync.Mutex - pumps map[string]*containerPump - routes map[chan *update]struct{} - client *docker.Client + mu sync.Mutex + pumps map[string]*containerPump + routes map[chan *update]struct{} + reportMetric func(MetricSample) + deadPumpAlerts chan DeadPumpAlert + client *docker.Client } // Name returns the name of the pump @@ -163,21 +167,29 @@ func (p *LogsPump) Run() error { return err } for _, listing := range containers { - p.pumpLogs(&docker.APIEvents{ - ID: normalID(listing.ID), - Status: "start", - }, false, inactivityTimeout) + p.pumpLogs( + &docker.APIEvents{ + ID: normalID(listing.ID), + Status: "start", + }, + false, + inactivityTimeout, + time.Duration(0), + ) } events := make(chan *docker.APIEvents) err = p.client.AddEventListener(events) if err != nil { return err } + + go p.consumeDeadPumpAlerts() + for event := range events { debug("pump.Run() event:", normalID(event.ID), event.Status) switch event.Status { case "start", "restart": - go p.pumpLogs(event, backlog(), inactivityTimeout) + go p.pumpLogs(event, backlog(), inactivityTimeout, time.Duration(0)) case "rename": go p.rename(event) case "die": @@ -187,7 +199,31 @@ func (p *LogsPump) Run() error { return errors.New("docker event stream closed") } -func (p *LogsPump) pumpLogs(event *docker.APIEvents, backlog bool, inactivityTimeout time.Duration) { +func (p *LogsPump) consumeDeadPumpAlerts() { + for deadPumpAlert := range p.deadPumpAlerts { + + containerName := "container nil" + if p.pumps[deadPumpAlert.PumpId].container != nil { + containerName = p.pumps[deadPumpAlert.PumpId].container.Name + } + debug("consumeDeadPumpAlerts() pumpId:", deadPumpAlert.PumpId, ", containerName: ", containerName) + + p.pumps[deadPumpAlert.PumpId].halt() + + go p.pumpLogs( + &docker.APIEvents{ + ID: deadPumpAlert.PumpId, + Status: "start", + }, + false, + getInactivityTimeoutFromEnv(), + deadPumpAlert.DeadFor, + ) + } +} + +func (p *LogsPump) pumpLogs(event *docker.APIEvents, backlog bool, inactivityTimeout, lookBackwards time.Duration) { + id := normalID(event.ID) container, err := p.client.InspectContainer(id) assert(err, "pump") @@ -210,6 +246,7 @@ func (p *LogsPump) pumpLogs(event *docker.APIEvents, backlog bool, inactivityTim sinceTime = time.Unix(0, 0) } else { sinceTime = time.Now() + sinceTime = sinceTime.Add(-lookBackwards) } p.mu.Lock() @@ -227,10 +264,18 @@ func (p *LogsPump) pumpLogs(event *docker.APIEvents, backlog bool, inactivityTim } outrd, outwr := io.Pipe() errrd, errwr := io.Pipe() - p.pumps[id] = newContainerPump(container, outrd, errrd) + haltPump := func() { + outwr.Close() + errwr.Close() + p.mu.Lock() + delete(p.pumps, id) + p.mu.Unlock() + } + p.pumps[id] = newContainerPump(p, id, haltPump, container, outrd, errrd) p.mu.Unlock() p.update(event) go func() { + for { debug("pump.pumpLogs():", id, "started, tail:", tail) err := p.client.Logs(docker.LogsOptions{ @@ -254,6 +299,8 @@ func (p *LogsPump) pumpLogs(event *docker.APIEvents, backlog bool, inactivityTim sinceTime = time.Now() if err == docker.ErrInactivityTimeout { sinceTime = sinceTime.Add(-inactivityTimeout) + } else { + time.Sleep(inactivityTimeout) } container, err := p.client.InspectContainer(id) @@ -267,11 +314,7 @@ func (p *LogsPump) pumpLogs(event *docker.APIEvents, backlog bool, inactivityTim } debug("pump.pumpLogs():", id, "dead") - outwr.Close() - errwr.Close() - p.mu.Lock() - delete(p.pumps, id) - p.mu.Unlock() + haltPump() return } }() @@ -351,12 +394,14 @@ func (p *LogsPump) Route(route *Route, logstream chan *Message) { type containerPump struct { sync.Mutex + halt func() container *docker.Container logstreams map[chan *Message]*Route } -func newContainerPump(container *docker.Container, stdout, stderr io.Reader) *containerPump { +func newContainerPump(logsPump *LogsPump, pumpId string, halt func(), container *docker.Container, stdout, stderr io.Reader) *containerPump { cp := &containerPump{ + halt: halt, container: container, logstreams: make(map[chan *Message]*Route), } @@ -370,6 +415,13 @@ func newContainerPump(container *docker.Container, stdout, stderr io.Reader) *co } return } + + logsPump.reportMetric(MetricSample{ + LogCount: 1, + ContainerName: normalName(container.Name), + PumpId: pumpId, + }) + cp.send(&Message{ Data: strings.TrimSuffix(line, "\n"), Container: container, diff --git a/router/pump_test.go b/router/pump_test.go index e05f9f99..667dd5c3 100644 --- a/router/pump_test.go +++ b/router/pump_test.go @@ -141,9 +141,11 @@ func TestPumpContainerRename(t *testing.T) { } client := newTestClient(&FakeRoundTripper{message: container, status: http.StatusOK}) p := &LogsPump{ - client: &client, - pumps: make(map[string]*containerPump), - routes: make(map[chan *update]struct{}), + client: &client, + pumps: make(map[string]*containerPump), + routes: make(map[chan *update]struct{}), + reportMetric: func(a MetricSample) {}, + deadPumpAlerts: make(chan DeadPumpAlert), } config := &docker.Config{ Tty: false, @@ -153,7 +155,7 @@ func TestPumpContainerRename(t *testing.T) { Name: "foo", Config: config, } - p.pumps["8dfafdbc3a40"] = newContainerPump(container, os.Stdout, os.Stderr) + p.pumps["8dfafdbc3a40"] = newContainerPump(p, "8dfafdbc3a40", func() {}, container, os.Stdout, os.Stderr) if name := p.pumps["8dfafdbc3a40"].container.Name; name != "foo" { t.Errorf("containerPump should have name: 'foo' got name: '%s'", name) } @@ -171,7 +173,16 @@ func TestPumpNewContainerPump(t *testing.T) { ID: "8dfafdbc3a40", Config: config, } - pump := newContainerPump(container, os.Stdout, os.Stderr) + client := newTestClient(&FakeRoundTripper{message: container, status: http.StatusOK}) + p := &LogsPump{ + client: &client, + pumps: make(map[string]*containerPump), + routes: make(map[chan *update]struct{}), + reportMetric: func(a MetricSample) {}, + deadPumpAlerts: make(chan DeadPumpAlert), + } + + pump := newContainerPump(p, "8dfafdbc3a40", func() {}, container, os.Stdout, os.Stderr) if pump == nil { t.Error("pump nil") return @@ -186,7 +197,15 @@ func TestPumpContainerPump(t *testing.T) { ID: "8dfafdbc3a40", Config: config, } - pump := newContainerPump(container, os.Stdout, os.Stderr) + client := newTestClient(&FakeRoundTripper{message: container, status: http.StatusOK}) + p := &LogsPump{ + client: &client, + pumps: make(map[string]*containerPump), + routes: make(map[chan *update]struct{}), + reportMetric: func(a MetricSample) {}, + deadPumpAlerts: make(chan DeadPumpAlert), + } + pump := newContainerPump(p, "8dfafdbc3a40", func() {}, container, os.Stdout, os.Stderr) logstream, route := make(chan *Message), &Route{} go func() { for msg := range logstream {