Skip to content
This repository has been archived by the owner on Jul 28, 2023. It is now read-only.

Add Database Log table to query database logs for a workspace. #23

Merged
merged 5 commits into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions docs/tables/steampipecloud_workspace_db_log.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Table: steampipecloud_workspace_db_log

Database logs records the underlying queries executed when a user executes a query.

## Examples

### List db logs for an actor by handle

```sql
select
id,
workspace_id,
workspace_handle,
duration,
query,
log_timestamp
from
steampipecloud_workspace_db_log
where
actor_handle = 'siddharthaturbot';
```

### List db logs for an actor by handle in a particular workspace

```sql
select
id,
workspace_id,
workspace_handle,
duration,
query,
log_timestamp
from
steampipecloud_workspace_db_log
where
actor_handle = 'siddharthaturbot'
and workspace_handle = 'dev';
```

### List queries that took more than 30 seconds to execute

```sql
select
id,
workspace_id,
workspace_handle,
duration,
query,
log_timestamp
from
steampipecloud_workspace_db_log
where
duration > 30000;
```

### List all queries that ran in my workspace in the last hour

```sql
select
id,
workspace_id,
workspace_handle,
duration,
query,
log_timestamp
from
steampipecloud_workspace_db_log
where
workspace_handle = 'dev'
and log_timestamp > now() - interval '1 hr';
```
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module steampipe-plugin-steampipecloud
go 1.18

require (
github.com/turbot/steampipe-cloud-sdk-go v0.1.2
github.com/turbot/steampipe-cloud-sdk-go v0.1.3
github.com/turbot/steampipe-plugin-sdk/v3 v3.3.1
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,10 @@ github.com/tkrajina/go-reflector v0.5.4 h1:dS9aJEa/eYNQU/fwsb5CSiATOxcNyA/gG/A7a
github.com/tkrajina/go-reflector v0.5.4/go.mod h1:9PyLgEOzc78ey/JmQQHbW8cQJ1oucLlNQsg8yFvkVk8=
github.com/turbot/go-kit v0.4.0 h1:EdD7Bf2EGAjvHRGQxRiWpDawzZSk3T+eghqbj74qiSc=
github.com/turbot/go-kit v0.4.0/go.mod h1:SBdPRngbEfYubiR81iAVtO43oPkg1+ASr+XxvgbH7/k=
github.com/turbot/steampipe-cloud-sdk-go v0.1.3 h1:j4D2S9XCATa0o1wxLuKr/F/uJM8shZer0/zvTfWOgvE=
github.com/turbot/steampipe-cloud-sdk-go v0.1.3/go.mod h1:8M2CspUHgCGqDCJV+FNn+boBPyLRHyzDinYnoZ/kZYw=
github.com/turbot/steampipe-plugin-sdk/v3 v3.3.1 h1:y6ExizWQkkllN5t4S3nFFaZuyttIW8agg/CNuhCjUoE=
github.com/turbot/steampipe-plugin-sdk/v3 v3.3.1/go.mod h1:8r7CDDlrSUd5iUgPlvPa5ttlZ4OEFNMHm8fdwgnn5WM=
github.com/turbot/steampipe-cloud-sdk-go v0.1.2 h1:H0F3zld6kX4uO2B5igV/r0GSOgqm3Zu6VEBHcwXvt1Y=
github.com/turbot/steampipe-cloud-sdk-go v0.1.2/go.mod h1:8M2CspUHgCGqDCJV+FNn+boBPyLRHyzDinYnoZ/kZYw=
github.com/vmihailenco/msgpack v3.3.3+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
github.com/vmihailenco/msgpack/v4 v4.3.12/go.mod h1:gborTTJjAo/GWTqqRjrLCn9pgNN+NXzzngzBKDPIqw4=
github.com/vmihailenco/tagparser v0.1.1/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI=
Expand Down
1 change: 1 addition & 0 deletions steampipecloud/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func Plugin(ctx context.Context) *plugin.Plugin {
"steampipecloud_workspace_connection": tableSteampipeCloudWorkspaceConnection(ctx),
"steampipecloud_workspace_mod": tableSteampipeCloudWorkspaceMod(ctx),
"steampipecloud_workspace_mod_variable": tableSteampipeCloudWorkspaceModVariable(ctx),
"steampipecloud_workspace_db_log": tableSteampipeCloudWorkspaceDbLog(ctx),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"steampipecloud_workspace_db_log": tableSteampipeCloudWorkspaceDbLog(ctx),
"steampipecloud_workspace_db_log": tableSteampipeCloudWorkspaceDBLog(ctx),

},
}

Expand Down
242 changes: 242 additions & 0 deletions steampipecloud/table_steampipecloud_db_log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package steampipecloud

import (
"context"

openapi "github.com/turbot/steampipe-cloud-sdk-go"

"github.com/turbot/steampipe-plugin-sdk/v3/grpc/proto"
"github.com/turbot/steampipe-plugin-sdk/v3/plugin"
"github.com/turbot/steampipe-plugin-sdk/v3/plugin/transform"
)

//// TABLE DEFINITION

func tableSteampipeCloudWorkspaceDbLog(_ context.Context) *plugin.Table {
sidr0cker marked this conversation as resolved.
Show resolved Hide resolved
return &plugin.Table{
Name: "steampipecloud_workspace_db_log",
Description: "Database logs records the underlying queries executed when a user executes a query.",
List: &plugin.ListConfig{
ParentHydrate: listWorkspaces,
Hydrate: listWorkspaceDbLogs,
sidr0cker marked this conversation as resolved.
Show resolved Hide resolved
},
Columns: []*plugin.Column{
{
Name: "id",
Description: "The unique identifier for a db log.",
Type: proto.ColumnType_STRING,
Transform: transform.FromCamel(),
},
{
Name: "actor_id",
Description: "The unique identifier for the user who executed the query.",
Type: proto.ColumnType_STRING,
Transform: transform.FromCamel(),
},
{
Name: "actor_handle",
Description: "The handle of the user who executed the query.",
Type: proto.ColumnType_STRING,
},
{
Name: "actor_display_name",
Description: "The display name of the user who executed the query.",
Type: proto.ColumnType_STRING,
},
{
Name: "actor_avatar_url",
Description: "The avatar of the user who executed the query.",
Type: proto.ColumnType_STRING,
Transform: transform.FromCamel(),
},
{
Name: "workspace_id",
Description: "The unique identifier of the workspace on which the query was executed.",
Type: proto.ColumnType_STRING,
Transform: transform.FromCamel(),
},
{
Name: "workspace_handle",
Description: "The handle of the workspace on which the query was executed.",
Type: proto.ColumnType_STRING,
},
{
Name: "duration",
Description: "The duration of the query in milliseconds(ms).",
Type: proto.ColumnType_DOUBLE,
Transform: transform.FromCamel(),
},
{
Name: "query",
Description: "The query that was executed in the workspace.",
Type: proto.ColumnType_STRING,
},
{
Name: "log_timestamp",
Description: "The time when the log got captured in postgres.",
Type: proto.ColumnType_TIMESTAMP,
},
{
Name: "created_at",
Description: "The time when the db log record was generated.",
Type: proto.ColumnType_STRING,
},
},
}
}

//// LIST FUNCTION

func listWorkspaceDbLogs(ctx context.Context, d *plugin.QueryData, h *plugin.HydrateData) (interface{}, error) {
sidr0cker marked this conversation as resolved.
Show resolved Hide resolved
// Get the workspace object from the parent hydrate
workspace := h.Item.(*openapi.Workspace)

// Create the connection
svc, err := connect(ctx, d)
if err != nil {
plugin.Logger(ctx).Error("listDbLogs", "connection_error", err)
return nil, err
}

// Get cached user identity
getUserIdentityCached := plugin.HydrateFunc(getUserIdentity).WithCache()
commonData, err := getUserIdentityCached(ctx, d, h)
if err != nil {
plugin.Logger(ctx).Error("listDbLogs", "getUserIdentityCached", err)
return nil, err
}

// Extract the user object from the cached identity
user := commonData.(openapi.User)

// If the requested number of items is less than the paging max limit
// set the limit to that instead
maxResults := int32(100)
limit := d.QueryContext.Limit
if d.QueryContext.Limit != nil {
if *limit < int64(maxResults) {
if *limit < 1 {
maxResults = int32(1)
} else {
maxResults = int32(*limit)
}
}
}

// If we want to get the db logs for the user
if user.Id == workspace.IdentityId {
err = listUserWorkspaceDbLogs(ctx, d, h, svc, maxResults, user.Id, workspace.Id)
sidr0cker marked this conversation as resolved.
Show resolved Hide resolved
} else {
err = listOrgWorkspaceDbLogs(ctx, d, h, svc, maxResults, workspace.IdentityId, workspace.Id)
sidr0cker marked this conversation as resolved.
Show resolved Hide resolved
}
if err != nil {
plugin.Logger(ctx).Error("listDbLogs", "error", err)
return nil, err
}

if err != nil {
plugin.Logger(ctx).Error("listDbLogs", "error", err)
return nil, err
}
return nil, nil
}

func listUserWorkspaceDbLogs(ctx context.Context, d *plugin.QueryData, h *plugin.HydrateData, svc *openapi.APIClient, maxResults int32, identityId, workspaceId string) error {
sidr0cker marked this conversation as resolved.
Show resolved Hide resolved
var err error

// execute list call
pagesLeft := true
var resp openapi.ListLogsResponse
var listDetails func(ctx context.Context, d *plugin.QueryData, h *plugin.HydrateData) (interface{}, error)

for pagesLeft {
if resp.NextToken != nil {
listDetails = func(ctx context.Context, d *plugin.QueryData, h *plugin.HydrateData) (interface{}, error) {
resp, _, err = svc.UserWorkspaces.ListDBLogs(ctx, identityId, workspaceId).NextToken(*resp.NextToken).Limit(maxResults).Execute()
return resp, err
}
} else {
listDetails = func(ctx context.Context, d *plugin.QueryData, h *plugin.HydrateData) (interface{}, error) {
resp, _, err = svc.UserWorkspaces.ListDBLogs(ctx, identityId, workspaceId).Limit(maxResults).Execute()
return resp, err
}
}

response, err := plugin.RetryHydrate(ctx, d, h, listDetails, &plugin.RetryConfig{ShouldRetryError: shouldRetryError})

if err != nil {
plugin.Logger(ctx).Error("listUserDbLogs", "list", err)
return err
}

result := response.(openapi.ListLogsResponse)

if result.HasItems() {
for _, log := range *result.Items {
d.StreamListItem(ctx, log)

// Context can be cancelled due to manual cancellation or the limit has been hit
if d.QueryStatus.RowsRemaining(ctx) == 0 {
return nil
}
}
}
if resp.NextToken == nil {
pagesLeft = false
} else {
resp.NextToken = result.NextToken
}
}

return nil
}

func listOrgWorkspaceDbLogs(ctx context.Context, d *plugin.QueryData, h *plugin.HydrateData, svc *openapi.APIClient, maxResults int32, identityId, workspaceId string) error {
sidr0cker marked this conversation as resolved.
Show resolved Hide resolved
var err error

// execute list call
pagesLeft := true
var resp openapi.ListLogsResponse
var listDetails func(ctx context.Context, d *plugin.QueryData, h *plugin.HydrateData) (interface{}, error)

for pagesLeft {
if resp.NextToken != nil {
listDetails = func(ctx context.Context, d *plugin.QueryData, h *plugin.HydrateData) (interface{}, error) {
resp, _, err = svc.OrgWorkspaces.ListDBLogs(ctx, identityId, workspaceId).NextToken(*resp.NextToken).Limit(maxResults).Execute()
return resp, err
}
} else {
listDetails = func(ctx context.Context, d *plugin.QueryData, h *plugin.HydrateData) (interface{}, error) {
resp, _, err = svc.OrgWorkspaces.ListDBLogs(ctx, identityId, workspaceId).Limit(maxResults).Execute()
return resp, err
}
}

response, err := plugin.RetryHydrate(ctx, d, h, listDetails, &plugin.RetryConfig{ShouldRetryError: shouldRetryError})

if err != nil {
plugin.Logger(ctx).Error("listOrgDbLogs", "list", err)
return err
}

result := response.(openapi.ListLogsResponse)

if result.HasItems() {
for _, log := range *result.Items {
d.StreamListItem(ctx, log)

// Context can be cancelled due to manual cancellation or the limit has been hit
if d.QueryStatus.RowsRemaining(ctx) == 0 {
return nil
}
}
}
if result.NextToken == nil {
pagesLeft = false
} else {
resp.NextToken = result.NextToken
}
}

return nil
}
4 changes: 2 additions & 2 deletions steampipecloud/table_steampipecloud_organization.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func listOrganizations(ctx context.Context, d *plugin.QueryData, h *plugin.Hydra
// execute list call
pagesLeft := true

var resp openapi.ListActorOrgsResponse
var resp openapi.ListUserOrgsResponse
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@siddharthaturbot Is this change related to the table that this PR is adding? If not, can you please describe why we're making this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ListActorOrgsResponse is not used by the SDK anymore. Its necessary to add it in the PR or it wont compile.

var listDetails func(ctx context.Context, d *plugin.QueryData, h *plugin.HydrateData) (interface{}, error)

for pagesLeft {
Expand All @@ -133,7 +133,7 @@ func listOrganizations(ctx context.Context, d *plugin.QueryData, h *plugin.Hydra
return nil, err
}

result := response.(openapi.ListActorOrgsResponse)
result := response.(openapi.ListUserOrgsResponse)

if result.HasItems() {
for _, org := range *result.Items {
Expand Down
6 changes: 0 additions & 6 deletions steampipecloud/table_steampipecloud_workspace_mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,6 @@ func listUserWorkspaceMods(ctx context.Context, d *plugin.QueryData, h *plugin.H

if result.HasItems() {
for _, workspaceMod := range *result.Items {
workspaceMod.Workspace = &openapi.Workspace{}
workspaceMod.Workspace.Handle = workspaceHandle
d.StreamListItem(ctx, workspaceMod)

// Context can be cancelled due to manual cancellation or the limit has been hit
Expand Down Expand Up @@ -231,8 +229,6 @@ func listOrgWorkspaceMods(ctx context.Context, d *plugin.QueryData, h *plugin.Hy

if result.HasItems() {
for _, workspaceMod := range *result.Items {
workspaceMod.Workspace = &openapi.Workspace{}
workspaceMod.Workspace.Handle = workspaceHandle
d.StreamListItem(ctx, workspaceMod)

// Context can be cancelled due to manual cancellation or the limit has been hit
Expand Down Expand Up @@ -312,7 +308,6 @@ func getUserWorkspaceMod(ctx context.Context, d *plugin.QueryData, h *plugin.Hyd
response, err := plugin.RetryHydrate(ctx, d, h, getDetails, &plugin.RetryConfig{ShouldRetryError: shouldRetryError})

workspaceMod := response.(openapi.WorkspaceMod)
workspaceMod.Workspace = &openapi.Workspace{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change related to the table we're adding?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, Workspace is not part of the struct anymore.


if err != nil {
plugin.Logger(ctx).Error("getUserWorkspaceMod", "get", err)
Expand All @@ -336,7 +331,6 @@ func getOrgWorkspaceMod(ctx context.Context, d *plugin.QueryData, h *plugin.Hydr
response, err := plugin.RetryHydrate(ctx, d, h, getDetails, &plugin.RetryConfig{ShouldRetryError: shouldRetryError})

workspaceMod := response.(openapi.WorkspaceMod)
workspaceMod.Workspace = &openapi.Workspace{}

if err != nil {
plugin.Logger(ctx).Error("getOrgWorkspaceMod", "get", err)
Expand Down