Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor distributed query #558

Merged
merged 12 commits into from
Dec 12, 2024
8 changes: 8 additions & 0 deletions api/handlers/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,14 @@ func (h *HandlersApi) QueriesRunHandler(w http.ResponseWriter, r *http.Request)

// Remove duplicates from expected
expectedClear := removeStringDuplicates(expected)

// Create new record for query list
for _, id := range expectedClear {
if err := h.Queries.CreateNodeQuery(newQuery.ID, id); err != nil {
log.Err(err).Msgf("error creating node query for query %s and node %s", newQuery.Name, id)
}
}

// Update value for expected
if err := h.Queries.SetExpected(queryName, len(expectedClear), env.ID); err != nil {
apiErrorResponse(w, "error setting expected", http.StatusInternalServerError, err)
Expand Down
4 changes: 4 additions & 0 deletions logging/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ func (l *LoggerTLS) ProcessLogQueryResult(queriesWrite types.QueryWriteRequest,
if err := l.Queries.TrackExecution(q, node.UUID, queriesWrite.Statuses[q]); err != nil {
log.Err(err).Msg("error adding query execution")
}
// Instead of creating a new record in a separate table, we can just update the query status
if err := l.Queries.UpdateQueryStatus(q, node.ID, queriesWrite.Statuses[q]); err != nil {
log.Err(err).Msg("error updating query status")
}
// Check if query is completed
if err := l.Queries.VerifyComplete(q, envid); err != nil {
log.Err(err).Msg("error verifying and completing query")
Expand Down
77 changes: 75 additions & 2 deletions queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ type DistributedQuery struct {
Expiration time.Time
}

// NodeQuery links a node to a query
type NodeQuery struct {
zhuoyuan-liu marked this conversation as resolved.
Show resolved Hide resolved
ID uint `gorm:"primaryKey;autoIncrement"`
NodeID uint `gorm:"not null;index"`
QueryID uint `gorm:"not null;index"`
Status string `gorm:"type:varchar(10);default:'pending'"`
zhuoyuan-liu marked this conversation as resolved.
Show resolved Hide resolved
CreatedAt time.Time `gorm:"default:CURRENT_TIMESTAMP"`
UpdatedAt time.Time
}

// DistributedQueryTarget to keep target logic for queries
type DistributedQueryTarget struct {
gorm.Model
Expand All @@ -105,8 +115,13 @@ type Queries struct {

// CreateQueries to initialize the queries struct
func CreateQueries(backend *gorm.DB) *Queries {
var q *Queries
q = &Queries{DB: backend}
//var q *Queries
q := &Queries{DB: backend}

// table node_queries
if err := backend.AutoMigrate(&NodeQuery{}); err != nil {
log.Fatal().Msgf("Failed to AutoMigrate table (node_queries): %v", err)
}
// table distributed_queries
if err := backend.AutoMigrate(&DistributedQuery{}); err != nil {
log.Fatal().Msgf("Failed to AutoMigrate table (distributed_queries): %v", err)
Expand All @@ -126,6 +141,31 @@ func CreateQueries(backend *gorm.DB) *Queries {
return q
}

func (q *Queries) NodeQueries_V2(node nodes.OsqueryNode) (QueryReadQueries, bool, error) {

var results []struct {
QueryName string
Query string
}

q.DB.Table("distributed_queries dq").
Select("dq.name, dq.query").
Joins("JOIN node_queries nq ON dq.id = nq.query_id").
Where("nq.node_id = ? AND nq.status = ?", node.ID, "pending").
Scan(&results)

if len(results) == 0 {
return QueryReadQueries{}, false, nil
}

qs := make(QueryReadQueries)
for _, _q := range results {
qs[_q.QueryName] = _q.Query
}

return qs, false, nil
}

// NodeQueries to get all queries that belong to the provided node
// FIXME this will impact the performance of the TLS endpoint due to being CPU and I/O hungry
// FIMXE potential mitigation can be add a cache (Redis?) layer to store queries per node_key
Expand Down Expand Up @@ -384,6 +424,18 @@ func (q *Queries) Create(query DistributedQuery) error {
return nil
}

// CreateNodeQuery to link a node to a query
func (q *Queries) CreateNodeQuery(nodeID, queryID uint) error {
nodeQuery := NodeQuery{
NodeID: nodeID,
QueryID: queryID,
}
if err := q.DB.Create(&nodeQuery).Error; err != nil {
return err
}
return nil
}

// CreateTarget to create target entry for a given query
func (q *Queries) CreateTarget(name, targetType, targetValue string) error {
queryTarget := DistributedQueryTarget{
Expand Down Expand Up @@ -449,6 +501,27 @@ func (q *Queries) SetExpected(name string, expected int, envid uint) error {
return nil
}

// UpdateQueryStatus to update the status of each query
func (q *Queries) UpdateQueryStatus(queryName string, nodeID uint, statusCode int) error {

var result string
if statusCode == 0 {
result = "completed" // TODO: need be replaced with a constant
zhuoyuan-liu marked this conversation as resolved.
Show resolved Hide resolved
} else {
result = "error"
}
var nodeQuery NodeQuery
// For the current setup, we need a joint query to update the status,
// I am wondering if we can put an extra field in the query so that we also get the query id back from the osquery
if err := q.DB.Where("node_id = ? AND query_id = ?", nodeID, queryName).Find(&nodeQuery).Error; err != nil {
return err
}
if err := q.DB.Model(&nodeQuery).Updates(map[string]interface{}{"status": result}).Error; err != nil {
return err
}
return nil
}

// TrackExecution to keep track of where queries have already ran
func (q *Queries) TrackExecution(name, uuid string, result int) error {
queryExecution := DistributedQueryExecution{
Expand Down