Skip to content

Commit

Permalink
Fixes #8: Stackdriver implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Zef Hemel committed Apr 20, 2018
1 parent 7eeb1e2 commit 79ae259
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 80 deletions.
59 changes: 49 additions & 10 deletions pkg/backend/stackdriver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/json"
"fmt"
"reflect"
"strings"
"time"

"google.golang.org/api/option"

Expand Down Expand Up @@ -89,25 +91,62 @@ func entryToLogMessage(entry *logging.Entry) common.LogMessage {
fmt.Printf("Could not marshall value: %v of type %v", entry.Payload, reflect.TypeOf(entry))
break
}
// fmt.Println("JSON: ", string(buf))
message.Attributes = payloadToAttributes(buf)
}
return message
}

func queryToFilter(query common.Query, projectName string, logName string) string {
pieces := []string{fmt.Sprintf(`logName = "projects/%s/logs/%s"`, projectName, logName)}
if query.QueryString != "" {
pieces = append(pieces, fmt.Sprintf(`"%s"`, query.QueryString))
}
for _, filter := range query.Filters {
pieces = append(pieces, fmt.Sprintf(`jsonPayload.%s %s "%s"`, filter.FieldName, filter.Operator, filter.Value))
}
if query.After != nil {
pieces = append(pieces, fmt.Sprintf(`timestamp > "%s"`, (*query.After).Format(time.RFC3339)))
}
if query.Before != nil {
pieces = append(pieces, fmt.Sprintf(`timestamp < "%s"`, (*query.Before).Format(time.RFC3339)))
}
return strings.Join(pieces, " AND ")
}

func (client *StackdriverClient) readLogBatch(ctx context.Context, query common.Query) ([]common.LogMessage, error) {
it := client.stackdriverClient.Entries(ctx, logadmin.Filter(queryToFilter(query, client.projectName, client.logName)))
messages := make([]common.LogMessage, 0, 20)

entry, err := it.Next()
if err != nil && err != iterator.Done {
return nil, err
}
resultCounter := 1
for err != iterator.Done && resultCounter <= query.MaxResults {
messages = append(messages, entryToLogMessage(entry))
entry, err = it.Next()
resultCounter++
}
return messages, nil
}

func (client *StackdriverClient) Query(ctx context.Context, query common.Query) <-chan common.LogMessage {
it := client.stackdriverClient.Entries(ctx, logadmin.Filter(fmt.Sprintf(`logName = "projects/%s/logs/%s"`, client.projectName, client.logName)))
if query.Follow {
return common.ReQueryFollow(ctx, func() ([]common.LogMessage, error) {
return client.readLogBatch(ctx, query)
})
}
resultChan := make(chan common.LogMessage)

go func() {
entry, err := it.Next()
if err != nil && err != iterator.Done {
fmt.Printf("Error retrieving logs: %s\n", err)
messages, err := client.readLogBatch(ctx, query)
if err != nil {
fmt.Printf("Error while fetching logs: %s\n", err)
close(resultChan)
return
}
for err != iterator.Done {
resultChan <- entryToLogMessage(entry)
entry, err = it.Next()
for _, message := range messages {
resultChan <- message
}
close(resultChan)
}()
Expand All @@ -128,9 +167,9 @@ func New(credentialsFile, projectName, logName string) *StackdriverClient {
}
}

func (client *StackdriverClient) ListLogs(ctx context.Context) ([]string, error) {
func (client *StackdriverClient) ListLogs() ([]string, error) {
logNames := make([]string, 0, 10)
it := client.stackdriverClient.Logs(ctx)
it := client.stackdriverClient.Logs(context.Background())
s, err := it.Next()
if err != nil && err != iterator.Done {
return nil, err
Expand Down
21 changes: 21 additions & 0 deletions pkg/backend/stackdriver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package stackdriver
import (
"fmt"
"testing"

"github.com/egnyte/ax/pkg/backend/common"
)

func TestAttributeDecoding(t *testing.T) {
Expand Down Expand Up @@ -30,3 +32,22 @@ func TestAttributeDecoding(t *testing.T) {
t.Error("obj.name")
}
}

func TestQueryToFilter(t *testing.T) {
if queryToFilter(common.Query{}, "my-project", "my-log") != `logName = "projects/my-project/logs/my-log"` {
t.Error("Empty search")
}
if queryToFilter(common.Query{QueryString: "My query"}, "my-project", "my-log") != `logName = "projects/my-project/logs/my-log" AND "My query"` {
t.Error("Basic search filter")
}
if queryToFilter(common.Query{
Filters: []common.QueryFilter{
{
FieldName: "name",
Operator: "=",
Value: "pete",
},
}}, "my-project", "my-log") != `logName = "projects/my-project/logs/my-log" AND jsonPayload.name = "pete"` {
t.Error("Where filter fail")
}
}
69 changes: 0 additions & 69 deletions pkg/backend/stackdriver/main.go

This file was deleted.

7 changes: 6 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func AddEnv() {
if name == "" {
name = "default"
}
fmt.Print("Choose a backend [kibana,cloudwatch]: ")
fmt.Print("Choose a backend (kibana,cloudwatch,stackdriver) [kibana]: ")
backend := readLine(reader)
if backend == "" {
backend = "kibana"
Expand All @@ -191,6 +191,11 @@ func AddEnv() {
if err != nil {
return
}
case "stackdriver":
em, err = stackdriverConfig(reader, config)
if err != nil {
return
}
default:
fmt.Println("Unsupported backend")
return
Expand Down
56 changes: 56 additions & 0 deletions pkg/config/stackdriver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package config

import (
"bufio"
"fmt"

"github.com/egnyte/ax/pkg/backend/stackdriver"
)

func testStackdriver(em EnvMap) bool {
cwClient := stackdriver.New(em["credentials"], em["project"], em["log"])
_, err := cwClient.ListLogs()
return err != nil
}

func stackdriverConfig(reader *bufio.Reader, existingConfig Config) (EnvMap, error) {
em := EnvMap{
"backend": "stackdriver",
}
existingSdEnv := findFirstEnvWhere(existingConfig.Environments, func(em EnvMap) bool {
return em["backend"] == "stackdriver"
})
if existingSdEnv != nil {
credentialsPath := (*existingSdEnv)["credentials"]
fmt.Printf("Path to credentials file (JSON) [%s]: ", credentialsPath)
em["credentials"] = readLine(reader)
if em["credentials"] == "" {
em["credentials"] = (*existingSdEnv)["credentials"]
}
} else {
fmt.Print("Path to credentials file (JSON): ")
em["credentials"] = readLine(reader)
}
fmt.Print("GCP Project name: ")
em["project"] = readLine(reader)
var sdClient *stackdriver.StackdriverClient
var logs []string
var err error
for {
fmt.Println("Attempting to connect to Stackdriver")
sdClient = stackdriver.New(em["credentials"], em["project"], "")
logs, err = sdClient.ListLogs()
if err != nil {
fmt.Printf("Got error connecting to Stackdriver: %s\n", err)
return em, err
}
break
}
fmt.Println("List of logs:")
for _, log := range logs {
fmt.Println(" ", log)
}
fmt.Print("Log: ")
em["log"] = readLine(reader)
return em, nil
}

0 comments on commit 79ae259

Please sign in to comment.