Skip to content

Commit

Permalink
Merge pull request #558 from zhuoyuan-liu/distributed-query
Browse files Browse the repository at this point in the history
Refactor distributed query
  • Loading branch information
javuto authored Dec 12, 2024
2 parents fd46213 + fb2ec8c commit e8b9832
Show file tree
Hide file tree
Showing 11 changed files with 351 additions and 40 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,7 @@ go.work.sum
deploy/docker/conf/tls/*
.env
!deploy/docker/conf/tls/openssl.cnf.example
tls.env

# bruno
tools/bruno/collection.bru
18 changes: 18 additions & 0 deletions admin/handlers/post.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ func (h *HandlersAdmin) QueryRunPOSTHandler(w http.ResponseWriter, r *http.Reque
h.Inc(metricAdminErr)
return
}
// Get the query id
newQuery, err = h.Queries.Get(newQuery.Name, env.ID)
if err != nil {
adminErrorResponse(w, "error creating query", http.StatusInternalServerError, err)
return
}
// Temporary list of UUIDs to calculate Expected
var expected []string
// Create environment target
Expand Down Expand Up @@ -223,6 +229,18 @@ func (h *HandlersAdmin) QueryRunPOSTHandler(w http.ResponseWriter, r *http.Reque
}
// Remove duplicates from expected
expectedClear := removeStringDuplicates(expected)

// Create new record for query list
for _, nodeUUID := range expectedClear {
node, err := h.Nodes.GetByUUID(nodeUUID)
if err != nil {
log.Err(err).Msgf("error getting node %s and failed to create node query for it", nodeUUID)
continue
}
if err := h.Queries.CreateNodeQuery(node.ID, newQuery.ID); err != nil {
log.Err(err).Msgf("error creating node query for query %s and node %s", newQuery.Name, nodeUUID)
}
}
// Update value for expected
if err := h.Queries.SetExpected(newQuery.Name, len(expectedClear), env.ID); err != nil {
adminErrorResponse(w, "error setting expected", http.StatusInternalServerError, err)
Expand Down
19 changes: 19 additions & 0 deletions api/handlers/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ func (h *HandlersApi) QueriesRunHandler(w http.ResponseWriter, r *http.Request)
h.Inc(metricAPIQueriesErr)
return
}
// Get the query id
newQuery, err = h.Queries.Get(queryName, env.ID)
if err != nil {
apiErrorResponse(w, "error creating query", http.StatusInternalServerError, err)
return
}

// Temporary list of UUIDs to calculate Expected
var expected []string
Expand Down Expand Up @@ -218,6 +224,19 @@ func (h *HandlersApi) QueriesRunHandler(w http.ResponseWriter, r *http.Request)

// Remove duplicates from expected
expectedClear := removeStringDuplicates(expected)

// Create new record for query list
for _, nodeUUID := range expectedClear {
node, err := h.Nodes.GetByUUID(nodeUUID)
if err != nil {
log.Err(err).Msgf("error getting node %s and failed to create node query for it", nodeUUID)
continue
}
if err := h.Queries.CreateNodeQuery(node.ID, newQuery.ID); err != nil {
log.Err(err).Msgf("error creating node query for query %s and node %s", newQuery.Name, nodeUUID)
}
}

// Update value for expected
if err := h.Queries.SetExpected(queryName, len(expectedClear), env.ID); err != nil {
apiErrorResponse(w, "error setting expected", http.StatusInternalServerError, err)
Expand Down
6 changes: 6 additions & 0 deletions logging/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (l *LoggerTLS) ProcessLogQueryResult(queriesWrite types.QueryWriteRequest,
Message: queriesWrite.Messages[q],
}
go l.DispatchQueries(d, node, debug)
// TODO: need be refactored
// Update internal metrics per query
var err error
if queriesWrite.Statuses[q] != 0 {
Expand All @@ -80,10 +81,15 @@ func (l *LoggerTLS) ProcessLogQueryResult(queriesWrite types.QueryWriteRequest,
if err != nil {
log.Err(err).Msg("error updating query")
}
// TODO: This TrackExeuction need be removed
// Add a record for this query
if err := l.Queries.TrackExecution(q, node.UUID, queriesWrite.Statuses[q]); err != nil {
log.Err(err).Msg("error adding query execution")
}
// Instead of creating a new record in a separate table, we can just update the query status
if err := l.Queries.UpdateQueryStatus(q, node.ID, queriesWrite.Statuses[q]); err != nil {
log.Err(err).Msg("error updating query status")
}
// Check if query is completed
if err := l.Queries.VerifyComplete(q, envid); err != nil {
log.Err(err).Msg("error verifying and completing query")
Expand Down
6 changes: 6 additions & 0 deletions queries/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,19 @@ replace github.com/jmpsec/osctrl/utils => ../utils
require (
github.com/jmpsec/osctrl/nodes v0.0.0-20241107152746-1f093f5e8faf
github.com/jmpsec/osctrl/utils v0.0.0-20241107150205-621ec8aafdae
github.com/stretchr/testify v1.9.0
gorm.io/driver/sqlite v1.5.6
gorm.io/gorm v1.25.12
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-sqlite3 v1.14.22 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/sys v0.26.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

require (
Expand Down
20 changes: 6 additions & 14 deletions queries/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand All @@ -27,25 +28,16 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/gorm v1.25.10 h1:dQpO+33KalOA+aFYGlK+EfxcI5MbO7EP2yYygwh9h+s=
gorm.io/gorm v1.25.10/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
gorm.io/gorm v1.25.11 h1:/Wfyg1B/je1hnDx3sMkX+gAlxrlZpn6X0BXRlwXlvHg=
gorm.io/gorm v1.25.11/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ=
gorm.io/driver/sqlite v1.5.6 h1:fO/X46qn5NUEEOZtnjJRWRzZMe8nqJiQ9E+0hi+hKQE=
gorm.io/driver/sqlite v1.5.6/go.mod h1:U+J8craQU6Fzkcvu8oLeAQmi50TkwPEhHDEjQZXDah4=
gorm.io/gorm v1.25.12 h1:I0u8i2hWQItBq1WfE0o2+WuL9+8L21K9e2HHSTE/0f8=
gorm.io/gorm v1.25.12/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ=
108 changes: 82 additions & 26 deletions queries/queries.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package queries

import (
"fmt"
"time"

"github.com/jmpsec/osctrl/nodes"
Expand Down Expand Up @@ -57,6 +58,12 @@ const (
TargetHidden string = "hidden"
)

const (
DistributedQueryStatusPending string = "pending"
DistributedQueryStatusCompleted string = "completed"
DistributedQueryStatusError string = "error"
)

// DistributedQuery as abstraction of a distributed query
type DistributedQuery struct {
gorm.Model
Expand All @@ -79,6 +86,14 @@ type DistributedQuery struct {
Expiration time.Time
}

// NodeQuery links a node to a query
type NodeQuery struct {
gorm.Model
NodeID uint `gorm:"not null;index"`
QueryID uint `gorm:"not null;index"`
Status string `gorm:"type:varchar(8);default:'pending'"`
}

// DistributedQueryTarget to keep target logic for queries
type DistributedQueryTarget struct {
gorm.Model
Expand All @@ -105,8 +120,13 @@ type Queries struct {

// CreateQueries to initialize the queries struct
func CreateQueries(backend *gorm.DB) *Queries {
var q *Queries
q = &Queries{DB: backend}
//var q *Queries
q := &Queries{DB: backend}

// table node_queries
if err := backend.AutoMigrate(&NodeQuery{}); err != nil {
log.Fatal().Msgf("Failed to AutoMigrate table (node_queries): %v", err)
}
// table distributed_queries
if err := backend.AutoMigrate(&DistributedQuery{}); err != nil {
log.Fatal().Msgf("Failed to AutoMigrate table (distributed_queries): %v", err)
Expand All @@ -126,34 +146,29 @@ func CreateQueries(backend *gorm.DB) *Queries {
return q
}

// NodeQueries to get all queries that belong to the provided node
// FIXME this will impact the performance of the TLS endpoint due to being CPU and I/O hungry
// FIMXE potential mitigation can be add a cache (Redis?) layer to store queries per node_key
func (q *Queries) NodeQueries(node nodes.OsqueryNode) (QueryReadQueries, bool, error) {
acelerate := false
// Get all current active queries and carves
queries, err := q.GetActive(node.EnvironmentID)
if err != nil {
return QueryReadQueries{}, false, err

var results []struct {
Name string
Query string
}
// Iterate through active queries, see if they target this node and prepare data in the same loop

q.DB.Table("distributed_queries dq").
Select("dq.name, dq.query").
Joins("JOIN node_queries nq ON dq.id = nq.query_id").
Where("nq.node_id = ? AND nq.status = ?", node.ID, DistributedQueryStatusPending).
Scan(&results)

if len(results) == 0 {
return QueryReadQueries{}, false, nil
}

qs := make(QueryReadQueries)
for _, _q := range queries {
targets, err := q.GetTargets(_q.Name)
if err != nil {
return QueryReadQueries{}, false, err
}
// FIXME disable acceleration until figure out edge cases where it would trigger by mistake
/*
if len(targets) == 1 {
acelerate = true
}
*/
if isQueryTarget(node, targets) && q.NotYetExecuted(_q.Name, node.UUID) {
qs[_q.Name] = _q.Query
}
for _, _q := range results {
qs[_q.Name] = _q.Query
}
return qs, acelerate, nil

return qs, false, nil
}

// Gets all queries by target (active/completed/all/all-full/deleted/hidden/expired)
Expand Down Expand Up @@ -384,6 +399,18 @@ func (q *Queries) Create(query DistributedQuery) error {
return nil
}

// CreateNodeQuery to link a node to a query
func (q *Queries) CreateNodeQuery(nodeID, queryID uint) error {
nodeQuery := NodeQuery{
NodeID: nodeID,
QueryID: queryID,
}
if err := q.DB.Create(&nodeQuery).Error; err != nil {
return err
}
return nil
}

// CreateTarget to create target entry for a given query
func (q *Queries) CreateTarget(name, targetType, targetValue string) error {
queryTarget := DistributedQueryTarget{
Expand Down Expand Up @@ -449,6 +476,35 @@ func (q *Queries) SetExpected(name string, expected int, envid uint) error {
return nil
}

// UpdateQueryStatus to update the status of each query
func (q *Queries) UpdateQueryStatus(queryName string, nodeID uint, statusCode int) error {

var result string
if statusCode == 0 {
result = DistributedQueryStatusCompleted
} else {
result = DistributedQueryStatusError
}

var query DistributedQuery
// TODO: Get the query id
// I think we can put an extra field in the query so that we also get the query id back from the osquery
// This way we can avoid this query to get the query id
if err := q.DB.Where("name = ?", queryName).Find(&query).Error; err != nil {
return fmt.Errorf("error getting query id: %v", err)
}

var nodeQuery NodeQuery

if err := q.DB.Where("node_id = ? AND query_id = ?", nodeID, query.ID).Find(&nodeQuery).Error; err != nil {
return err
}
if err := q.DB.Model(&nodeQuery).Updates(map[string]interface{}{"status": result}).Error; err != nil {
return err
}
return nil
}

// TrackExecution to keep track of where queries have already ran
func (q *Queries) TrackExecution(name, uuid string, result int) error {
queryExecution := DistributedQueryExecution{
Expand Down
Loading

0 comments on commit e8b9832

Please sign in to comment.