diff --git a/query/worker_test.go b/query/worker_test.go index 287ba891..bcefbae7 100644 --- a/query/worker_test.go +++ b/query/worker_test.go @@ -101,7 +101,7 @@ func startWorker() (*testCtx, error) { subscriptions: make(chan chan wire.Message), quit: make(chan struct{}), } - results := make(chan *jobResult) + results := make(chan *jobResult, maxJobs) quit := make(chan struct{}) wk := NewWorker(peer) diff --git a/query/workmanager.go b/query/workmanager.go index 9d6317fb..c616a48b 100644 --- a/query/workmanager.go +++ b/query/workmanager.go @@ -15,6 +15,9 @@ const ( // maxQueryTimeout is the maximum timeout given to a single query. maxQueryTimeout = 32 * time.Second + + // maxJobs is the maximum amount of jobs a single worker can have. + maxJobs = 10 ) var ( @@ -74,11 +77,10 @@ type PeerRanking interface { // activeWorker wraps a Worker that is currently running, together with the job // we have given to it. -// TODO(halseth): support more than one active job at a time. type activeWorker struct { - w Worker + w Worker activeJobs map[uint64]*queryJob - onExit chan struct{} + onExit chan struct{} } // Config holds the configuration options for a new WorkManager. @@ -126,8 +128,8 @@ var _ WorkManager = (*peerWorkManager)(nil) func NewWorkManager(cfg *Config) WorkManager { return &peerWorkManager{ cfg: cfg, - newBatches: make(chan *batch), - jobResults: make(chan *jobResult), + newBatches: make(chan *batch, maxJobs), + jobResults: make(chan *jobResult, maxJobs), quit: make(chan struct{}), } } @@ -220,7 +222,7 @@ Loop: for p, r := range workers { // Only one active job at a time is currently // supported. - if len(r.activeJobs) >= 1 { + if len(r.activeJobs) >= maxJobs { continue } diff --git a/query/workmanager_test.go b/query/workmanager_test.go index c88b2f18..0ff0c72d 100644 --- a/query/workmanager_test.go +++ b/query/workmanager_test.go @@ -84,8 +84,8 @@ func startWorkManager(t *testing.T, numWorkers int) (WorkManager, NewWorker: func(peer Peer) Worker { m := &mockWorker{ peer: peer, - nextJob: make(chan *queryJob), - results: make(chan *jobResult), + nextJob: make(chan *queryJob, maxJobs), + results: make(chan *jobResult, maxJobs), } workerChan <- m return m @@ -205,7 +205,7 @@ func TestWorkManagerWorkDispatcherFailures(t *testing.T) { for i := 0; i < numQueries; i++ { q := &Request{} queries[i] = q - scheduledJobs[i] = make(chan sched) + scheduledJobs[i] = make(chan sched, maxJobs) } // For each worker, spin up a goroutine that will forward the job it @@ -387,7 +387,7 @@ func TestWorkManagerCancelBatch(t *testing.T) { // TestWorkManagerWorkRankingScheduling checks that the work manager schedules // jobs among workers according to the peer ranking. func TestWorkManagerWorkRankingScheduling(t *testing.T) { - const numQueries = 4 + const numQueries = 40 const numWorkers = 8 workMgr, workers := startWorkManager(t, numWorkers) @@ -414,7 +414,7 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) { var jobs []*queryJob for i := 0; i < numQueries; i++ { select { - case job := <-workers[i].nextJob: + case job := <-workers[i/10].nextJob: if job.index != uint64(i) { t.Fatalf("unexpected job") } @@ -449,7 +449,7 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) { // Go backwards, and succeed the queries. for i := numQueries - 1; i >= 0; i-- { select { - case workers[i].results <- &jobResult{ + case workers[i/10].results <- &jobResult{ job: jobs[i], err: nil, }: