From d34eaa407588ecaea458f467631de44fc08bc74f Mon Sep 17 00:00:00 2001 From: Reuben Lifshay Date: Thu, 5 Oct 2023 15:15:53 -0700 Subject: [PATCH 1/3] fix: prevent multiple jobs from being queued if one gets delayed --- cmd/start.go | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/cmd/start.go b/cmd/start.go index d7d304b..ab62d54 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -32,6 +32,7 @@ import ( "os/signal" "runtime" "sort" + "sync" "time" "github.com/bloodhoundad/azurehound/v2/client/rest" @@ -98,18 +99,33 @@ func start(ctx context.Context) { var ( currentJob *models.ClientJob + jobQueued bool + mutex sync.Mutex ) for { + mutex.Lock() + loopJobQueued := jobQueued + loopJobExists := currentJob != nil + mutex.Unlock() select { case <-ticker.C: - if currentJob != nil { + if loopJobExists { log.V(1).Info("collection in progress...", "jobId", currentJob.ID) if err := checkin(ctx, *bheInstance, bheClient); err != nil { log.Error(err, "bloodhound enterprise service checkin failed") } - } else { + } else if !loopJobQueued { + mutex.Lock() + jobQueued = true + mutex.Unlock() go func() { + defer (func() { + mutex.Lock() + jobQueued = false + currentJob = nil + mutex.Unlock() + })() log.V(2).Info("checking for available collection jobs") if jobs, err := getAvailableJobs(ctx, *bheInstance, bheClient, updatedClient.ID); err != nil { log.Error(err, "unable to fetch available jobs for azurehound") @@ -134,10 +150,11 @@ func start(ctx context.Context) { } else { // Notify BHE instance of job start + mutex.Lock() currentJob = &executableJobs[0] + mutex.Unlock() if err := startJob(ctx, *bheInstance, bheClient, currentJob.ID); err != nil { log.Error(err, "failed to start job, will retry on next heartbeat") - currentJob = nil return } @@ -161,8 +178,6 @@ func start(ctx context.Context) { } else { log.Info(message, "id", currentJob.ID, "duration", duration.String()) } - - currentJob = nil } } }() From 0d4df1afcb94a7af6cae9554f4fa732bbb2eef66 Mon Sep 17 00:00:00 2001 From: Reuben Lifshay Date: Mon, 9 Oct 2023 16:30:33 -0700 Subject: [PATCH 2/3] fix: update job handling to properly use synchronization primitives --- cmd/start.go | 37 ++++++++++++------------------------- 1 file changed, 12 insertions(+), 25 deletions(-) diff --git a/cmd/start.go b/cmd/start.go index ab62d54..8fc2e38 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -33,6 +33,7 @@ import ( "runtime" "sort" "sync" + "sync/atomic" "time" "github.com/bloodhoundad/azurehound/v2/client/rest" @@ -98,34 +99,21 @@ func start(ctx context.Context) { defer ticker.Stop() var ( - currentJob *models.ClientJob - jobQueued bool - mutex sync.Mutex + jobQueued sync.Mutex + currentJobID atomic.Int64 ) for { - mutex.Lock() - loopJobQueued := jobQueued - loopJobExists := currentJob != nil - mutex.Unlock() select { case <-ticker.C: - if loopJobExists { - log.V(1).Info("collection in progress...", "jobId", currentJob.ID) + if jobID := currentJobID.Load(); jobID != 0 { + log.V(1).Info("collection in progress...", "jobId", jobID) if err := checkin(ctx, *bheInstance, bheClient); err != nil { log.Error(err, "bloodhound enterprise service checkin failed") } - } else if !loopJobQueued { - mutex.Lock() - jobQueued = true - mutex.Unlock() + } else if jobQueued.TryLock() { go func() { - defer (func() { - mutex.Lock() - jobQueued = false - currentJob = nil - mutex.Unlock() - })() + defer jobQueued.Unlock() log.V(2).Info("checking for available collection jobs") if jobs, err := getAvailableJobs(ctx, *bheInstance, bheClient, updatedClient.ID); err != nil { log.Error(err, "unable to fetch available jobs for azurehound") @@ -148,12 +136,11 @@ func start(ctx context.Context) { if len(executableJobs) == 0 { log.V(2).Info("there are no jobs for azurehound to complete at this time") } else { - + defer currentJobID.Store(0) + queuedJobID := executableJobs[0].ID + currentJobID.Store(int64(queuedJobID)) // Notify BHE instance of job start - mutex.Lock() - currentJob = &executableJobs[0] - mutex.Unlock() - if err := startJob(ctx, *bheInstance, bheClient, currentJob.ID); err != nil { + if err := startJob(ctx, *bheInstance, bheClient, queuedJobID); err != nil { log.Error(err, "failed to start job, will retry on next heartbeat") return } @@ -176,7 +163,7 @@ func start(ctx context.Context) { if err := endJob(ctx, *bheInstance, bheClient, models.JobStatusComplete, message); err != nil { log.Error(err, "failed to end job") } else { - log.Info(message, "id", currentJob.ID, "duration", duration.String()) + log.Info(message, "id", queuedJobID, "duration", duration.String()) } } } From fcdde247681385768606c5eb27612311596e6030 Mon Sep 17 00:00:00 2001 From: Dillon Lees Date: Wed, 11 Oct 2023 15:56:55 -0400 Subject: [PATCH 3/3] chore: update dockerfile base version --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 2460562..2a6559e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ # syntax=docker/dockerfile:1 -FROM golang:1.18 as build +FROM golang:1.20 as build WORKDIR /app ARG VERSION=v0.0.0