Skip to content

Commit

Permalink
Merge pull request #576 from zhuoyuan-liu/complete-query
Browse files Browse the repository at this point in the history
Second step for refactor distributed query
  • Loading branch information
javuto authored Jan 7, 2025
2 parents 3851617 + cd78f0c commit 63b2a29
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 209 deletions.
73 changes: 35 additions & 38 deletions admin/handlers/post.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,93 +156,90 @@ func (h *HandlersAdmin) QueryRunPOSTHandler(w http.ResponseWriter, r *http.Reque
adminErrorResponse(w, "error creating query", http.StatusInternalServerError, err)
return
}
// Temporary list of UUIDs to calculate Expected
var expected []string

// List all the nodes that match the query
var expected []uint

targetNodesID := []uint{}
// TODO: Refactor this to use osctrl-api instead of direct DB queries
// Create environment target
if len(q.Environments) > 0 {
expected = []uint{}
for _, e := range q.Environments {
if (e != "") && h.Envs.Exists(e) {
if err := h.Queries.CreateTarget(newQuery.Name, queries.QueryTargetEnvironment, e); err != nil {
adminErrorResponse(w, "error creating query environment target", http.StatusInternalServerError, err)
h.Inc(metricAdminErr)
return
}
nodes, err := h.Nodes.GetByEnv(e, "active", h.Settings.InactiveHours(settings.NoEnvironmentID))
if err != nil {
adminErrorResponse(w, "error getting nodes by environment", http.StatusInternalServerError, err)
h.Inc(metricAdminErr)
return
}
for _, n := range nodes {
expected = append(expected, n.UUID)
expected = append(expected, n.ID)
}
}
}
targetNodesID = utils.Intersect(targetNodesID, expected)
}
// Create platform target
if len(q.Platforms) > 0 {
expected = []uint{}
platforms, _ := h.Nodes.GetAllPlatforms()
for _, p := range q.Platforms {
if (p != "") && checkValidPlatform(platforms, p) {
if err := h.Queries.CreateTarget(newQuery.Name, queries.QueryTargetPlatform, p); err != nil {
adminErrorResponse(w, "error creating query platform target", http.StatusInternalServerError, err)
h.Inc(metricAdminErr)
return
}
nodes, err := h.Nodes.GetByPlatform(p, "active", h.Settings.InactiveHours(settings.NoEnvironmentID))
if err != nil {
adminErrorResponse(w, "error getting nodes by platform", http.StatusInternalServerError, err)
h.Inc(metricAdminErr)
return
}
for _, n := range nodes {
expected = append(expected, n.UUID)
expected = append(expected, n.ID)
}
}
}
targetNodesID = utils.Intersect(targetNodesID, expected)
}
// Create UUIDs target
if len(q.UUIDs) > 0 {
expected = []uint{}
for _, u := range q.UUIDs {
if (u != "") && h.Nodes.CheckByUUID(u) {
if err := h.Queries.CreateTarget(newQuery.Name, queries.QueryTargetUUID, u); err != nil {
adminErrorResponse(w, "error creating query UUID target", http.StatusInternalServerError, err)
h.Inc(metricAdminErr)
return
if u != "" {
node, err := h.Nodes.GetByUUID(u)
if err != nil {
log.Err(err).Msgf("error getting node %s and failed to create node query for it", u)
continue
}
expected = append(expected, u)
expected = append(expected, node.ID)
}
}
targetNodesID = utils.Intersect(targetNodesID, expected)
}
// Create hostnames target
if len(q.Hosts) > 0 {
expected = []uint{}
for _, _h := range q.Hosts {
if (_h != "") && h.Nodes.CheckByHost(_h) {
if err := h.Queries.CreateTarget(newQuery.Name, queries.QueryTargetLocalname, _h); err != nil {
adminErrorResponse(w, "error creating query hostname target", http.StatusInternalServerError, err)
h.Inc(metricAdminErr)
return
if _h != "" {
node, err := h.Nodes.GetByIdentifier(_h)
if err != nil {
log.Err(err).Msgf("error getting node %s and failed to create node query for it", _h)
continue
}
expected = append(expected, _h)
expected = append(expected, node.ID)
}
}
targetNodesID = utils.Intersect(targetNodesID, expected)
}
// Remove duplicates from expected
expectedClear := removeStringDuplicates(expected)

// Create new record for query list
for _, nodeUUID := range expectedClear {
node, err := h.Nodes.GetByUUID(nodeUUID)
if err != nil {
log.Err(err).Msgf("error getting node %s and failed to create node query for it", nodeUUID)
continue
}
if err := h.Queries.CreateNodeQuery(node.ID, newQuery.ID); err != nil {
log.Err(err).Msgf("error creating node query for query %s and node %s", newQuery.Name, nodeUUID)
// If the list is empty, we don't need to create node queries
if len(targetNodesID) != 0 {
if err := h.Queries.CreateNodeQueries(targetNodesID, newQuery.ID); err != nil {
log.Err(err).Msgf("error creating node queries for query %s", newQuery.Name)
adminErrorResponse(w, "error creating node queries", http.StatusInternalServerError, err)
return
}
}
// Update value for expected
if err := h.Queries.SetExpected(newQuery.Name, len(expectedClear), env.ID); err != nil {
if err := h.Queries.SetExpected(newQuery.Name, len(targetNodesID), env.ID); err != nil {
adminErrorResponse(w, "error setting expected", http.StatusInternalServerError, err)
h.Inc(metricAdminErr)
return
Expand Down
6 changes: 6 additions & 0 deletions admin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,12 @@ func osctrlAdminService() {
log.Err(err).Msg("Error getting all environments")
}
for _, e := range allEnvs {
// Periotically check if the queries are completed
// not sure if we need to complete the Carves
if err := queriesmgr.CleanupCompletedQueries(e.ID); err != nil {
log.Err(err).Msg("Error completing expired queries")
}
// Periotically check if the queries are expired
if err := queriesmgr.CleanupExpiredQueries(e.ID); err != nil {
log.Err(err).Msg("Error cleaning up expired queries")
}
Expand Down
80 changes: 39 additions & 41 deletions api/handlers/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,95 +150,93 @@ func (h *HandlersApi) QueriesRunHandler(w http.ResponseWriter, r *http.Request)
return
}

// Temporary list of UUIDs to calculate Expected
var expected []string
// Create targets
// List all the nodes that match the query
var expected []uint

targetNodesID := []uint{}
// Current logic is to select nodes meeting all criteria in the query
// TODO: I believe we should only allow to list nodes in one environment in URL paths
// We will refactor this part to be tag based queries and add more options to the query
if len(q.Environments) > 0 {
expected = []uint{}
for _, e := range q.Environments {
if (e != "") && h.Envs.Exists(e) {
if err := h.Queries.CreateTarget(newQuery.Name, queries.QueryTargetEnvironment, e); err != nil {
apiErrorResponse(w, "error creating query environment target", http.StatusInternalServerError, err)
h.Inc(metricAPIQueriesErr)
return
}
nodes, err := h.Nodes.GetByEnv(e, "active", h.Settings.InactiveHours(settings.NoEnvironmentID))
if err != nil {
apiErrorResponse(w, "error getting nodes by environment", http.StatusInternalServerError, err)
h.Inc(metricAPIQueriesErr)
return
}
for _, n := range nodes {
expected = append(expected, n.UUID)
expected = append(expected, n.ID)
}
}
}
targetNodesID = utils.Intersect(targetNodesID, expected)
}
// Create platform target
if len(q.Platforms) > 0 {
expected = []uint{}
platforms, _ := h.Nodes.GetAllPlatforms()
for _, p := range q.Platforms {
if (p != "") && checkValidPlatform(platforms, p) {
if err := h.Queries.CreateTarget(newQuery.Name, queries.QueryTargetPlatform, p); err != nil {
apiErrorResponse(w, "error creating query platform target", http.StatusInternalServerError, err)
h.Inc(metricAPIQueriesErr)
return
}
nodes, err := h.Nodes.GetByPlatform(p, "active", h.Settings.InactiveHours(settings.NoEnvironmentID))
if err != nil {
apiErrorResponse(w, "error getting nodes by platform", http.StatusInternalServerError, err)
h.Inc(metricAPIQueriesErr)
return
}
for _, n := range nodes {
expected = append(expected, n.UUID)
expected = append(expected, n.ID)
}
}
}
targetNodesID = utils.Intersect(targetNodesID, expected)
}
// Create UUIDs target
if len(q.UUIDs) > 0 {
expected = []uint{}
for _, u := range q.UUIDs {
if (u != "") && h.Nodes.CheckByUUID(u) {
if err := h.Queries.CreateTarget(newQuery.Name, queries.QueryTargetUUID, u); err != nil {
apiErrorResponse(w, "error creating query UUID target", http.StatusInternalServerError, err)
h.Inc(metricAPIQueriesErr)
return
if u != "" {
node, err := h.Nodes.GetByUUID(u)
if err != nil {
log.Warn().Msgf("error getting node %s and failed to create node query for it", u)
continue
}
expected = append(expected, u)
expected = append(expected, node.ID)
}
}
targetNodesID = utils.Intersect(targetNodesID, expected)
}
// Create hostnames target
// Currently we are using the GetByIdentifier function and it need be more clear
// about the definition of the identifier
if len(q.Hosts) > 0 {
for _, _h := range q.Hosts {
if (_h != "") && h.Nodes.CheckByHost(_h) {
if err := h.Queries.CreateTarget(newQuery.Name, queries.QueryTargetLocalname, _h); err != nil {
apiErrorResponse(w, "error creating query hostname target", http.StatusInternalServerError, err)
h.Inc(metricAPIQueriesErr)
return
expected = []uint{}
for _, hostName := range q.Hosts {
if hostName != "" {
node, err := h.Nodes.GetByIdentifier(hostName)
if err != nil {
log.Warn().Msgf("error getting node %s and failed to create node query for it", hostName)
continue
}
expected = append(expected, _h)
expected = append(expected, node.ID)
}
}
targetNodesID = utils.Intersect(targetNodesID, expected)
}

// Remove duplicates from expected
expectedClear := removeStringDuplicates(expected)

// Create new record for query list
for _, nodeUUID := range expectedClear {
node, err := h.Nodes.GetByUUID(nodeUUID)
if err != nil {
log.Err(err).Msgf("error getting node %s and failed to create node query for it", nodeUUID)
continue
}
if err := h.Queries.CreateNodeQuery(node.ID, newQuery.ID); err != nil {
log.Err(err).Msgf("error creating node query for query %s and node %s", newQuery.Name, nodeUUID)
// If the list is empty, we don't need to create node queries
if len(targetNodesID) != 0 {
if err := h.Queries.CreateNodeQueries(targetNodesID, newQuery.ID); err != nil {
log.Err(err).Msgf("error creating node queries for query %s", newQuery.Name)
apiErrorResponse(w, "error creating node queries", http.StatusInternalServerError, err)
return
}
}

// Update value for expected
if err := h.Queries.SetExpected(queryName, len(expectedClear), env.ID); err != nil {
if err := h.Queries.SetExpected(queryName, len(targetNodesID), env.ID); err != nil {
apiErrorResponse(w, "error setting expected", http.StatusInternalServerError, err)
h.Inc(metricAPICarvesErr)
return
Expand Down
15 changes: 0 additions & 15 deletions api/handlers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,3 @@ func checkValidPlatform(platforms []string, platform string) bool {
}
return false
}

// Helper to remove duplicates from []string
func removeStringDuplicates(s []string) []string {
seen := make(map[string]struct{}, len(s))
i := 0
for _, v := range s {
if _, ok := seen[v]; ok {
continue
}
seen[v] = struct{}{}
s[i] = v
i++
}
return s[:i]
}
8 changes: 0 additions & 8 deletions logging/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,14 +273,6 @@ func (logDB *LoggerDB) CleanQueryLogs(entries int64) error {
if err := logDB.Database.Conn.Unscoped().Delete(&queriesTargets).Error; err != nil {
return err
}
// Get query executions
var queriesExecutions []queries.DistributedQueryExecution
if err := logDB.Database.Conn.Where("name = ?", q.Name).Find(&queriesExecutions).Error; err != nil {
return err
}
if err := logDB.Database.Conn.Unscoped().Delete(&queriesExecutions).Error; err != nil {
return err
}
// Delete query
if err := logDB.Database.Conn.Unscoped().Delete(&q).Error; err != nil {
return err
Expand Down
11 changes: 1 addition & 10 deletions logging/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,9 @@ func (l *LoggerTLS) ProcessLogQueryResult(queriesWrite types.QueryWriteRequest,
if err != nil {
log.Err(err).Msg("error updating query")
}
// TODO: This TrackExeuction need be removed
// Add a record for this query
if err := l.Queries.TrackExecution(q, node.UUID, queriesWrite.Statuses[q]); err != nil {
log.Err(err).Msg("error adding query execution")
}
// Instead of creating a new record in a separate table, we can just update the query status
// Update query status
if err := l.Queries.UpdateQueryStatus(q, node.ID, queriesWrite.Statuses[q]); err != nil {
log.Err(err).Msg("error updating query status")
}
// Check if query is completed
if err := l.Queries.VerifyComplete(q, envid); err != nil {
log.Err(err).Msg("error verifying and completing query")
}
}
}
2 changes: 1 addition & 1 deletion nodes/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (n *NodeManager) UpdateMetadataByUUID(uuid string, metadata NodeMetadata) e
return fmt.Errorf("RecordUsername %v", err)
}
if metadata.Username != node.Username && metadata.Username != "" {
updates["username"] =metadata.Username
updates["username"] = metadata.Username
}
// Record hostname
if err := n.RecordHostname(metadata.Hostname, node); err != nil {
Expand Down
Loading

0 comments on commit 63b2a29

Please sign in to comment.