diff --git a/pkg/backend/stackdriver/client.go b/pkg/backend/stackdriver/client.go index 0d0bed3..e9288a2 100644 --- a/pkg/backend/stackdriver/client.go +++ b/pkg/backend/stackdriver/client.go @@ -5,6 +5,8 @@ import ( "encoding/json" "fmt" "reflect" + "strings" + "time" "google.golang.org/api/option" @@ -89,25 +91,62 @@ func entryToLogMessage(entry *logging.Entry) common.LogMessage { fmt.Printf("Could not marshall value: %v of type %v", entry.Payload, reflect.TypeOf(entry)) break } - // fmt.Println("JSON: ", string(buf)) message.Attributes = payloadToAttributes(buf) } return message } +func queryToFilter(query common.Query, projectName string, logName string) string { + pieces := []string{fmt.Sprintf(`logName = "projects/%s/logs/%s"`, projectName, logName)} + if query.QueryString != "" { + pieces = append(pieces, fmt.Sprintf(`"%s"`, query.QueryString)) + } + for _, filter := range query.Filters { + pieces = append(pieces, fmt.Sprintf(`jsonPayload.%s %s "%s"`, filter.FieldName, filter.Operator, filter.Value)) + } + if query.After != nil { + pieces = append(pieces, fmt.Sprintf(`timestamp > "%s"`, (*query.After).Format(time.RFC3339))) + } + if query.Before != nil { + pieces = append(pieces, fmt.Sprintf(`timestamp < "%s"`, (*query.Before).Format(time.RFC3339))) + } + return strings.Join(pieces, " AND ") +} + +func (client *StackdriverClient) readLogBatch(ctx context.Context, query common.Query) ([]common.LogMessage, error) { + it := client.stackdriverClient.Entries(ctx, logadmin.Filter(queryToFilter(query, client.projectName, client.logName))) + messages := make([]common.LogMessage, 0, 20) + + entry, err := it.Next() + if err != nil && err != iterator.Done { + return nil, err + } + resultCounter := 1 + for err != iterator.Done && resultCounter <= query.MaxResults { + messages = append(messages, entryToLogMessage(entry)) + entry, err = it.Next() + resultCounter++ + } + return messages, nil +} + func (client *StackdriverClient) Query(ctx context.Context, query common.Query) <-chan common.LogMessage { - it := client.stackdriverClient.Entries(ctx, logadmin.Filter(fmt.Sprintf(`logName = "projects/%s/logs/%s"`, client.projectName, client.logName))) + if query.Follow { + return common.ReQueryFollow(ctx, func() ([]common.LogMessage, error) { + return client.readLogBatch(ctx, query) + }) + } resultChan := make(chan common.LogMessage) go func() { - entry, err := it.Next() - if err != nil && err != iterator.Done { - fmt.Printf("Error retrieving logs: %s\n", err) + messages, err := client.readLogBatch(ctx, query) + if err != nil { + fmt.Printf("Error while fetching logs: %s\n", err) close(resultChan) + return } - for err != iterator.Done { - resultChan <- entryToLogMessage(entry) - entry, err = it.Next() + for _, message := range messages { + resultChan <- message } close(resultChan) }() @@ -128,9 +167,9 @@ func New(credentialsFile, projectName, logName string) *StackdriverClient { } } -func (client *StackdriverClient) ListLogs(ctx context.Context) ([]string, error) { +func (client *StackdriverClient) ListLogs() ([]string, error) { logNames := make([]string, 0, 10) - it := client.stackdriverClient.Logs(ctx) + it := client.stackdriverClient.Logs(context.Background()) s, err := it.Next() if err != nil && err != iterator.Done { return nil, err diff --git a/pkg/backend/stackdriver/client_test.go b/pkg/backend/stackdriver/client_test.go index 575b600..cc13e6e 100644 --- a/pkg/backend/stackdriver/client_test.go +++ b/pkg/backend/stackdriver/client_test.go @@ -3,6 +3,8 @@ package stackdriver import ( "fmt" "testing" + + "github.com/egnyte/ax/pkg/backend/common" ) func TestAttributeDecoding(t *testing.T) { @@ -30,3 +32,22 @@ func TestAttributeDecoding(t *testing.T) { t.Error("obj.name") } } + +func TestQueryToFilter(t *testing.T) { + if queryToFilter(common.Query{}, "my-project", "my-log") != `logName = "projects/my-project/logs/my-log"` { + t.Error("Empty search") + } + if queryToFilter(common.Query{QueryString: "My query"}, "my-project", "my-log") != `logName = "projects/my-project/logs/my-log" AND "My query"` { + t.Error("Basic search filter") + } + if queryToFilter(common.Query{ + Filters: []common.QueryFilter{ + { + FieldName: "name", + Operator: "=", + Value: "pete", + }, + }}, "my-project", "my-log") != `logName = "projects/my-project/logs/my-log" AND jsonPayload.name = "pete"` { + t.Error("Where filter fail") + } +} diff --git a/pkg/backend/stackdriver/main.go b/pkg/backend/stackdriver/main.go deleted file mode 100644 index 6a459a9..0000000 --- a/pkg/backend/stackdriver/main.go +++ /dev/null @@ -1,69 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - "fmt" - - "cloud.google.com/go/logging" - "cloud.google.com/go/logging/logadmin" - "google.golang.org/api/iterator" -) - -func writeLogs() { - ctx := context.Background() - client, err := logging.NewClient(ctx, "turbo-service") - if err != nil { - panic(err) - } - lg := client.Logger("my-test-log") - - // Add entry to log buffer - err = lg.LogSync(ctx, logging.Entry{Payload: json.RawMessage([]byte(`{"name": "Zef", "age": 34}`))}) - if err != nil { - panic(err) - } - client.Close() -} - -func listLogs(ctx context.Context, client *logadmin.Client) ([]string, error) { - logNames := make([]string, 0, 10) - it := client.Logs(ctx) - s, err := it.Next() - if err != nil && err != iterator.Done { - return nil, err - } - for err != iterator.Done { - logNames = append(logNames, s) - s, err = it.Next() - if err != nil && err != iterator.Done { - return nil, err - } - } - return logNames, nil -} - -func getLogs(ctx context.Context, client *logadmin.Client, logName string) { - it := client.Entries(ctx, logadmin.Filter(fmt.Sprintf(`logName = "projects/turbo-service/logs/%s"`, logName))) - entry, err := it.Next() - if err != nil && err != iterator.Done { - panic(err) - } - for err != iterator.Done { - fmt.Println(entry.Timestamp, entry.Payload) - entry, err = it.Next() - } -} - -func main() { - ctx := context.Background() - client, err := logadmin.NewClient(ctx, "turbo-service") - - if err != nil { - panic(err) - } - - // names, err := listLogs(ctx, client) - // fmt.Println(names, err) - getLogs(ctx, client, "my-test-log") -} diff --git a/pkg/config/config.go b/pkg/config/config.go index 317cad1..e5e993b 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -173,7 +173,7 @@ func AddEnv() { if name == "" { name = "default" } - fmt.Print("Choose a backend [kibana,cloudwatch]: ") + fmt.Print("Choose a backend (kibana,cloudwatch,stackdriver) [kibana]: ") backend := readLine(reader) if backend == "" { backend = "kibana" @@ -191,6 +191,11 @@ func AddEnv() { if err != nil { return } + case "stackdriver": + em, err = stackdriverConfig(reader, config) + if err != nil { + return + } default: fmt.Println("Unsupported backend") return diff --git a/pkg/config/stackdriver.go b/pkg/config/stackdriver.go new file mode 100644 index 0000000..8fef98e --- /dev/null +++ b/pkg/config/stackdriver.go @@ -0,0 +1,56 @@ +package config + +import ( + "bufio" + "fmt" + + "github.com/egnyte/ax/pkg/backend/stackdriver" +) + +func testStackdriver(em EnvMap) bool { + cwClient := stackdriver.New(em["credentials"], em["project"], em["log"]) + _, err := cwClient.ListLogs() + return err != nil +} + +func stackdriverConfig(reader *bufio.Reader, existingConfig Config) (EnvMap, error) { + em := EnvMap{ + "backend": "stackdriver", + } + existingSdEnv := findFirstEnvWhere(existingConfig.Environments, func(em EnvMap) bool { + return em["backend"] == "stackdriver" + }) + if existingSdEnv != nil { + credentialsPath := (*existingSdEnv)["credentials"] + fmt.Printf("Path to credentials file (JSON) [%s]: ", credentialsPath) + em["credentials"] = readLine(reader) + if em["credentials"] == "" { + em["credentials"] = (*existingSdEnv)["credentials"] + } + } else { + fmt.Print("Path to credentials file (JSON): ") + em["credentials"] = readLine(reader) + } + fmt.Print("GCP Project name: ") + em["project"] = readLine(reader) + var sdClient *stackdriver.StackdriverClient + var logs []string + var err error + for { + fmt.Println("Attempting to connect to Stackdriver") + sdClient = stackdriver.New(em["credentials"], em["project"], "") + logs, err = sdClient.ListLogs() + if err != nil { + fmt.Printf("Got error connecting to Stackdriver: %s\n", err) + return em, err + } + break + } + fmt.Println("List of logs:") + for _, log := range logs { + fmt.Println(" ", log) + } + fmt.Print("Log: ") + em["log"] = readLine(reader) + return em, nil +}