Skip to content

Commit

Permalink
Merge pull request #4 from dbgeek/poor_mans_tail
Browse files Browse the repository at this point in the history
feat: tail command
  • Loading branch information
dbgeek authored Mar 15, 2019
2 parents 318118b + 0c7a28c commit 547ac08
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 4 deletions.
65 changes: 65 additions & 0 deletions cmd/tail.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package cmd

import (
"bytes"
"fmt"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/dbgeek/elblogcat/logcat"
"github.com/dbgeek/elblogcat/logworker"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)

// tailCmd represents the tail command
var tailCmd = &cobra.Command{
Use: "tail",
Short: "Porman tail pool for new accesslogs for default every 1min",
Long: `
`,
Run: func(cmd *cobra.Command, args []string) {
awsConfiguration := logworker.AWSconfiguration{Region: "eu-west-1"}
configuration := logworker.NewConfiguration()
accessLogFilter := logworker.NewAccessLogFilter()
client := logworker.NewLogWorker(
&awsConfiguration,
&configuration,
&accessLogFilter,
)
logs := make(chan string, 1)

client.Tail(logs)

for v := range logs {
buff := &aws.WriteAtBuffer{}
key := fmt.Sprintf("%s%s", accessLogFilter.AccesslogPath(configuration.Prefix), v)
_, err := client.S3Downloader.Download(buff, &s3.GetObjectInput{
Bucket: aws.String(viper.GetString("s3-bucket")),
Key: aws.String(key),
})
if err != nil {
logworker.Logger.Fatalf("Failed to Download key: %v from s3. Got error: %v",
key,
err)
}

c := logcat.NewRowFilter()
b := bytes.NewBuffer(buff.Bytes())
a := logcat.Accesslog{
Content: b,
RowFilter: c,
PrintFields: viper.GetString("fields"),
}
a.Cat()
}
},
}

func init() {
rootCmd.AddCommand(tailCmd)
tailCmd.PersistentFlags().Duration("polling-interval", 60*time.Second, "")
viper.BindPFlag("polling-interval", tailCmd.PersistentFlags().Lookup("polling-interval"))

}
84 changes: 80 additions & 4 deletions logworker/logworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ type (
Profile string
}
Configuration struct {
Bucket string
Prefix string
Bucket string
Prefix string
PollingInterval time.Duration
}
AccessLogFilter struct {
matchString string
Expand Down Expand Up @@ -138,6 +139,80 @@ func (l *LogWorker) List() []string {
return accessLogs
}

func (l *LogWorker) Tail(logch chan<- string) {
go func() {
accessLogFilter := NewAccessLogFilter()
consumedAccessLogs := make(map[string]struct{})

lbAccessLogTimestamp := l.AccessLogFilter.StartTime
for t := lbAccessLogTimestamp; t.Before(time.Now().UTC()); t = t.Add(5 * time.Minute) {
lbAccessLogTimestamp = t
lbAccessLog := fmt.Sprintf("%s_elasticloadbalancing_%s_%s_%s",
accessLogFilter.AwsAccountID,
accessLogFilter.Region,
accessLogFilter.LoadBalancerID,
t.Format("20060102T1504Z"),
)
s3Prefix := filepath.Join(l.AccessLogFilter.AccesslogPath(l.Configuration.Prefix), lbAccessLog)
for _, accessLog := range *l.listAccessLogs(s3Prefix) {
if _, ok := consumedAccessLogs[accessLog]; !ok {
consumedAccessLogs[accessLog] = struct{}{}
logch <- accessLog
}
}
}

poller := time.Tick(l.Configuration.PollingInterval)
for now := range poller {

lbAccessLogTimestamp = lbAccessLogTimestamp.Add(15 * time.Second)
lbAccessLog := fmt.Sprintf("%s_elasticloadbalancing_%s_%s_%s",
accessLogFilter.AwsAccountID,
accessLogFilter.Region,
accessLogFilter.LoadBalancerID,
now.UTC().Format("20060102T1504Z"),
)
s3Prefix := filepath.Join(l.AccessLogFilter.AccesslogPath(l.Configuration.Prefix), lbAccessLog)
for _, accessLog := range *l.listAccessLogs(s3Prefix) {
if _, ok := consumedAccessLogs[accessLog]; !ok {
consumedAccessLogs[accessLog] = struct{}{}
logch <- accessLog
}
}
for k := range consumedAccessLogs {
ts := strings.Split(k, "_")
t, _ := time.Parse("20060102T1504Z", ts[4])
if t.Before(now.UTC().Add(-2 * time.Minute)) {
delete(consumedAccessLogs, k)
}

}
}
}()
}

func (l *LogWorker) listAccessLogs(s3Prefix string) *[]string {
var al []string
input := &s3.ListObjectsV2Input{
Bucket: aws.String(l.Configuration.Bucket),
Prefix: aws.String(s3Prefix),
Delimiter: aws.String("/"),
MaxKeys: aws.Int64(200),
}
err := l.S3.ListObjectsV2Pages(input,
func(page *s3.ListObjectsV2Output, lastPage bool) bool {
for _, val := range page.Contents {
accessLog := strings.Split(*val.Key, "/")[len(strings.Split(*val.Key, "/"))-1]
al = append(al, accessLog)
}
return true
})
if err != nil {
fmt.Println(err)
}
return &al
}

func (a *AccessLogFilter) AccesslogPath(prefix string) string {
return filepath.Join(prefix, fmt.Sprintf("AWSLogs/%s/elasticloadbalancing/%s/%s/", a.AwsAccountID, a.Region, a.StartTime.Format("2006/01/02"))) + "/"

Expand Down Expand Up @@ -197,7 +272,8 @@ func NewAccessLogFilter() AccessLogFilter {

func NewConfiguration() Configuration {
return Configuration{
Bucket: viper.GetString("s3-bucket"),
Prefix: viper.GetString("s3-prefix"),
Bucket: viper.GetString("s3-bucket"),
Prefix: viper.GetString("s3-prefix"),
PollingInterval: viper.GetDuration("polling-interval"),
}
}

0 comments on commit 547ac08

Please sign in to comment.