Skip to content

Commit

Permalink
fix: rafactor builder, remove unnecessary dispatcher.
Browse files Browse the repository at this point in the history
issue: #238
  • Loading branch information
CodeBear801 committed Apr 3, 2020
1 parent 4c65cb0 commit 3a9474f
Showing 1 changed file with 14 additions and 41 deletions.
55 changes: 14 additions & 41 deletions integration/service/connectivitymap/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@ type connectivityMapBuilder struct {
id2NearbyIDs ID2NearByIDsMap

numOfWorker int
dispatchWaitGroup *sync.WaitGroup
workerWaitGroup *sync.WaitGroup
aggregatorWaitGroup *sync.WaitGroup
tasks4WorkerC []chan spatialindexer.PointInfo
aggregatorC chan placeIDWithNearByPlaceIDs
}

Expand All @@ -32,79 +30,55 @@ func newConnectivityMapBuilder(iterator spatialindexer.PointsIterator, finder sp
id2NearbyIDs: make(ID2NearByIDsMap),

numOfWorker: numOfWorker,
dispatchWaitGroup: &sync.WaitGroup{},
workerWaitGroup: &sync.WaitGroup{},
aggregatorWaitGroup: &sync.WaitGroup{},
tasks4WorkerC: make([]chan spatialindexer.PointInfo, numOfWorker),
aggregatorC: make(chan placeIDWithNearByPlaceIDs, 10000),
}

if numOfWorker < 1 {
glog.Fatal("numOfWorker should never be smaller than 1, recommend using NumCPU()\n")
}

for i := range builder.tasks4WorkerC {
builder.tasks4WorkerC[i] = make(chan spatialindexer.PointInfo, 50)
}

return builder
}

/*
-> tasksChannel ---> worker
/ \
/ \
Input Iterator ==> dispatcher ---> tasksChannel ---> worker ---> aggregatorChannel -> iterate and put result to map
\ /
\ /
-> tasksChannel ---> worker
-> worker (fetch task -> find -> rank)
/ \
/ \
Input Iterator(channel) ---> worker (fetch task -> find -> rank) ---> aggregatorChannel -> feed to map
\ /
\ /
-> worker (fetch task -> find -> rank)
*/

func (builder *connectivityMapBuilder) build() ID2NearByIDsMap {
builder.dispatch()
builder.process()
builder.aggregate()
builder.wait()

return builder.id2NearbyIDs
}

func (builder *connectivityMapBuilder) dispatch() {
builder.dispatchWaitGroup.Add(1)

go func() {
counter := 0
for p := range builder.iterator.IteratePoints() {
builder.tasks4WorkerC[counter%builder.numOfWorker] <- p
counter += 1
}

for i := 0; i < builder.numOfWorker; i++ {
close(builder.tasks4WorkerC[i])
}

glog.Infof("builder's dispatch is finished. Total input is %d.\n", counter)
builder.dispatchWaitGroup.Done()
}()
}

func (builder *connectivityMapBuilder) process() {
inputC := builder.iterator.IteratePoints()

for i := 0; i < builder.numOfWorker; i++ {
builder.workerWaitGroup.Add(1)
go builder.work(builder.workerWaitGroup, i, builder.tasks4WorkerC[i], builder.aggregatorC)
go builder.work(i, inputC, builder.aggregatorC)
}

glog.Infof("builder's process is finished, start number of %d workers.\n", builder.numOfWorker)
glog.Infof("builder start number of %d workers.\n", builder.numOfWorker)
}

func (builder *connectivityMapBuilder) work(wg *sync.WaitGroup, workerID int, source <-chan spatialindexer.PointInfo, sink chan<- placeIDWithNearByPlaceIDs) {
defer wg.Done()
func (builder *connectivityMapBuilder) work(workerID int, source <-chan spatialindexer.PointInfo, sink chan<- placeIDWithNearByPlaceIDs) {
defer builder.workerWaitGroup.Done()

counter := 0
for p := range source {
counter += 1
nearbyIDs := builder.finder.FindNearByPointIDs(p.Location, builder.distanceLimit, spatialindexer.UnlimitedCount)
rankedResults := builder.ranker.RankPointIDsByGreatCircleDistance(p.Location, nearbyIDs)
rankedResults := builder.ranker.RankPointIDsByShortestDistance(p.Location, nearbyIDs)

ids := make([]IDAndDistance, 0, len(rankedResults))
for _, r := range rankedResults {
Expand Down Expand Up @@ -139,7 +113,6 @@ func (builder *connectivityMapBuilder) aggregate() {
}

func (builder *connectivityMapBuilder) wait() {
builder.dispatchWaitGroup.Wait()
builder.workerWaitGroup.Wait()
close(builder.aggregatorC)
builder.aggregatorWaitGroup.Wait()
Expand Down

0 comments on commit 3a9474f

Please sign in to comment.