Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor distributed query #558

Merged
merged 12 commits into from
Dec 12, 2024
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,7 @@ go.work.sum
deploy/docker/conf/tls/*
.env
!deploy/docker/conf/tls/openssl.cnf.example
tls.env

# bruno
tools/bruno/collection.bru
18 changes: 18 additions & 0 deletions admin/handlers/post.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ func (h *HandlersAdmin) QueryRunPOSTHandler(w http.ResponseWriter, r *http.Reque
h.Inc(metricAdminErr)
return
}
// Get the query id
newQuery, err = h.Queries.Get(newQuery.Name, env.ID)
if err != nil {
adminErrorResponse(w, "error creating query", http.StatusInternalServerError, err)
return
}
// Temporary list of UUIDs to calculate Expected
var expected []string
// Create environment target
Expand Down Expand Up @@ -223,6 +229,18 @@ func (h *HandlersAdmin) QueryRunPOSTHandler(w http.ResponseWriter, r *http.Reque
}
// Remove duplicates from expected
expectedClear := removeStringDuplicates(expected)

// Create new record for query list
for _, nodeUUID := range expectedClear {
node, err := h.Nodes.GetByUUID(nodeUUID)
if err != nil {
log.Err(err).Msgf("error getting node %s and failed to create node query for it", nodeUUID)
continue
}
if err := h.Queries.CreateNodeQuery(node.ID, newQuery.ID); err != nil {
zhuoyuan-liu marked this conversation as resolved.
Show resolved Hide resolved
log.Err(err).Msgf("error creating node query for query %s and node %s", newQuery.Name, nodeUUID)
}
}
// Update value for expected
if err := h.Queries.SetExpected(newQuery.Name, len(expectedClear), env.ID); err != nil {
adminErrorResponse(w, "error setting expected", http.StatusInternalServerError, err)
Expand Down
19 changes: 19 additions & 0 deletions api/handlers/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ func (h *HandlersApi) QueriesRunHandler(w http.ResponseWriter, r *http.Request)
h.Inc(metricAPIQueriesErr)
return
}
// Get the query id
newQuery, err = h.Queries.Get(queryName, env.ID)
if err != nil {
apiErrorResponse(w, "error creating query", http.StatusInternalServerError, err)
return
}

// Temporary list of UUIDs to calculate Expected
var expected []string
Expand Down Expand Up @@ -218,6 +224,19 @@ func (h *HandlersApi) QueriesRunHandler(w http.ResponseWriter, r *http.Request)

// Remove duplicates from expected
expectedClear := removeStringDuplicates(expected)

// Create new record for query list
for _, nodeUUID := range expectedClear {
node, err := h.Nodes.GetByUUID(nodeUUID)
if err != nil {
log.Err(err).Msgf("error getting node %s and failed to create node query for it", nodeUUID)
continue
}
if err := h.Queries.CreateNodeQuery(node.ID, newQuery.ID); err != nil {
log.Err(err).Msgf("error creating node query for query %s and node %s", newQuery.Name, nodeUUID)
}
}

// Update value for expected
if err := h.Queries.SetExpected(queryName, len(expectedClear), env.ID); err != nil {
apiErrorResponse(w, "error setting expected", http.StatusInternalServerError, err)
Expand Down
6 changes: 6 additions & 0 deletions logging/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (l *LoggerTLS) ProcessLogQueryResult(queriesWrite types.QueryWriteRequest,
Message: queriesWrite.Messages[q],
}
go l.DispatchQueries(d, node, debug)
// TODO: need be refactored
// Update internal metrics per query
var err error
if queriesWrite.Statuses[q] != 0 {
Expand All @@ -80,10 +81,15 @@ func (l *LoggerTLS) ProcessLogQueryResult(queriesWrite types.QueryWriteRequest,
if err != nil {
log.Err(err).Msg("error updating query")
}
// TODO: This TrackExeuction need be removed
// Add a record for this query
if err := l.Queries.TrackExecution(q, node.UUID, queriesWrite.Statuses[q]); err != nil {
log.Err(err).Msg("error adding query execution")
}
// Instead of creating a new record in a separate table, we can just update the query status
if err := l.Queries.UpdateQueryStatus(q, node.ID, queriesWrite.Statuses[q]); err != nil {
log.Err(err).Msg("error updating query status")
}
// Check if query is completed
if err := l.Queries.VerifyComplete(q, envid); err != nil {
log.Err(err).Msg("error verifying and completing query")
Expand Down
6 changes: 6 additions & 0 deletions queries/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,19 @@ replace github.com/jmpsec/osctrl/utils => ../utils
require (
github.com/jmpsec/osctrl/nodes v0.0.0-20241107152746-1f093f5e8faf
github.com/jmpsec/osctrl/utils v0.0.0-20241107150205-621ec8aafdae
github.com/stretchr/testify v1.9.0
gorm.io/driver/sqlite v1.5.6
gorm.io/gorm v1.25.12
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-sqlite3 v1.14.22 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/sys v0.26.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

require (
Expand Down
20 changes: 6 additions & 14 deletions queries/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand All @@ -27,25 +28,16 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/gorm v1.25.10 h1:dQpO+33KalOA+aFYGlK+EfxcI5MbO7EP2yYygwh9h+s=
gorm.io/gorm v1.25.10/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
gorm.io/gorm v1.25.11 h1:/Wfyg1B/je1hnDx3sMkX+gAlxrlZpn6X0BXRlwXlvHg=
gorm.io/gorm v1.25.11/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ=
gorm.io/driver/sqlite v1.5.6 h1:fO/X46qn5NUEEOZtnjJRWRzZMe8nqJiQ9E+0hi+hKQE=
gorm.io/driver/sqlite v1.5.6/go.mod h1:U+J8craQU6Fzkcvu8oLeAQmi50TkwPEhHDEjQZXDah4=
gorm.io/gorm v1.25.12 h1:I0u8i2hWQItBq1WfE0o2+WuL9+8L21K9e2HHSTE/0f8=
gorm.io/gorm v1.25.12/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ=
108 changes: 82 additions & 26 deletions queries/queries.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package queries

import (
"fmt"
"time"

"github.com/jmpsec/osctrl/nodes"
Expand Down Expand Up @@ -57,6 +58,12 @@ const (
TargetHidden string = "hidden"
)

const (
DistributedQueryStatusPending string = "pending"
DistributedQueryStatusCompleted string = "completed"
DistributedQueryStatusError string = "error"
)

// DistributedQuery as abstraction of a distributed query
type DistributedQuery struct {
gorm.Model
Expand All @@ -79,6 +86,14 @@ type DistributedQuery struct {
Expiration time.Time
}

// NodeQuery links a node to a query
type NodeQuery struct {
zhuoyuan-liu marked this conversation as resolved.
Show resolved Hide resolved
gorm.Model
NodeID uint `gorm:"not null;index"`
QueryID uint `gorm:"not null;index"`
Status string `gorm:"type:varchar(8);default:'pending'"`
}

// DistributedQueryTarget to keep target logic for queries
type DistributedQueryTarget struct {
gorm.Model
Expand All @@ -105,8 +120,13 @@ type Queries struct {

// CreateQueries to initialize the queries struct
func CreateQueries(backend *gorm.DB) *Queries {
var q *Queries
q = &Queries{DB: backend}
//var q *Queries
q := &Queries{DB: backend}

// table node_queries
if err := backend.AutoMigrate(&NodeQuery{}); err != nil {
log.Fatal().Msgf("Failed to AutoMigrate table (node_queries): %v", err)
}
// table distributed_queries
if err := backend.AutoMigrate(&DistributedQuery{}); err != nil {
log.Fatal().Msgf("Failed to AutoMigrate table (distributed_queries): %v", err)
Expand All @@ -126,34 +146,29 @@ func CreateQueries(backend *gorm.DB) *Queries {
return q
}

// NodeQueries to get all queries that belong to the provided node
// FIXME this will impact the performance of the TLS endpoint due to being CPU and I/O hungry
// FIMXE potential mitigation can be add a cache (Redis?) layer to store queries per node_key
func (q *Queries) NodeQueries(node nodes.OsqueryNode) (QueryReadQueries, bool, error) {
acelerate := false
// Get all current active queries and carves
queries, err := q.GetActive(node.EnvironmentID)
if err != nil {
return QueryReadQueries{}, false, err

var results []struct {
Name string
Query string
}
// Iterate through active queries, see if they target this node and prepare data in the same loop

q.DB.Table("distributed_queries dq").
Select("dq.name, dq.query").
Joins("JOIN node_queries nq ON dq.id = nq.query_id").
Where("nq.node_id = ? AND nq.status = ?", node.ID, DistributedQueryStatusPending).
Scan(&results)

if len(results) == 0 {
return QueryReadQueries{}, false, nil
}

qs := make(QueryReadQueries)
for _, _q := range queries {
targets, err := q.GetTargets(_q.Name)
if err != nil {
return QueryReadQueries{}, false, err
}
// FIXME disable acceleration until figure out edge cases where it would trigger by mistake
/*
if len(targets) == 1 {
acelerate = true
}
*/
if isQueryTarget(node, targets) && q.NotYetExecuted(_q.Name, node.UUID) {
qs[_q.Name] = _q.Query
}
for _, _q := range results {
qs[_q.Name] = _q.Query
}
return qs, acelerate, nil

return qs, false, nil
}

// Gets all queries by target (active/completed/all/all-full/deleted/hidden/expired)
Expand Down Expand Up @@ -384,6 +399,18 @@ func (q *Queries) Create(query DistributedQuery) error {
return nil
}

// CreateNodeQuery to link a node to a query
func (q *Queries) CreateNodeQuery(nodeID, queryID uint) error {
nodeQuery := NodeQuery{
NodeID: nodeID,
QueryID: queryID,
}
if err := q.DB.Create(&nodeQuery).Error; err != nil {
return err
}
return nil
}

// CreateTarget to create target entry for a given query
func (q *Queries) CreateTarget(name, targetType, targetValue string) error {
queryTarget := DistributedQueryTarget{
Expand Down Expand Up @@ -449,6 +476,35 @@ func (q *Queries) SetExpected(name string, expected int, envid uint) error {
return nil
}

// UpdateQueryStatus to update the status of each query
func (q *Queries) UpdateQueryStatus(queryName string, nodeID uint, statusCode int) error {

var result string
if statusCode == 0 {
result = DistributedQueryStatusCompleted
} else {
result = DistributedQueryStatusError
}

var query DistributedQuery
// TODO: Get the query id
// I think we can put an extra field in the query so that we also get the query id back from the osquery
// This way we can avoid this query to get the query id
if err := q.DB.Where("name = ?", queryName).Find(&query).Error; err != nil {
return fmt.Errorf("error getting query id: %v", err)
}

var nodeQuery NodeQuery

if err := q.DB.Where("node_id = ? AND query_id = ?", nodeID, query.ID).Find(&nodeQuery).Error; err != nil {
return err
}
if err := q.DB.Model(&nodeQuery).Updates(map[string]interface{}{"status": result}).Error; err != nil {
return err
}
return nil
}

// TrackExecution to keep track of where queries have already ran
func (q *Queries) TrackExecution(name, uuid string, result int) error {
queryExecution := DistributedQueryExecution{
Expand Down
Loading