Skip to content

Commit

Permalink
Feature/Implement connectivity map (#259)
Browse files Browse the repository at this point in the history
* feat: implement feature of place connectivity and builder with taskpool.
issue: #238
  • Loading branch information
CodeBear801 authored Apr 7, 2020
1 parent 914d13c commit 2b28e5b
Show file tree
Hide file tree
Showing 15 changed files with 925 additions and 11 deletions.
156 changes: 156 additions & 0 deletions integration/service/connectivitymap/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package connectivitymap

import (
"sync"

"github.com/Telenav/osrm-backend/integration/service/spatialindexer"
"github.com/golang/glog"
)

type connectivityMapBuilder struct {
iterator spatialindexer.PointsIterator
finder spatialindexer.Finder
ranker spatialindexer.Ranker
distanceLimit float64
id2NearbyIDs ID2NearByIDsMap

numOfWorker int
workerWaitGroup *sync.WaitGroup
aggregatorWaitGroup *sync.WaitGroup
aggregatorC chan placeIDWithNearByPlaceIDs
}

func newConnectivityMapBuilder(iterator spatialindexer.PointsIterator, finder spatialindexer.Finder,
ranker spatialindexer.Ranker, distanceLimit float64, numOfWorker int) *connectivityMapBuilder {
builder := &connectivityMapBuilder{
iterator: iterator,
finder: finder,
ranker: ranker,
distanceLimit: distanceLimit,
id2NearbyIDs: make(ID2NearByIDsMap),

numOfWorker: numOfWorker,
workerWaitGroup: &sync.WaitGroup{},
aggregatorWaitGroup: &sync.WaitGroup{},
aggregatorC: make(chan placeIDWithNearByPlaceIDs, 10000),
}

if numOfWorker < 1 {
glog.Fatal("numOfWorker should never be smaller than 1, recommend using NumCPU()\n")
}

return builder
}

/*
-> worker (fetch task -> find -> rank)
/ \
/ \
Input Iterator(channel) ---> worker (fetch task -> find -> rank) ---> aggregatorChannel -> feed to map
\ /
\ /
-> worker (fetch task -> find -> rank)
*/

func (builder *connectivityMapBuilder) build() ID2NearByIDsMap {
builder.process()
builder.aggregate()
builder.wait()

return builder.id2NearbyIDs
}

func (builder *connectivityMapBuilder) process() {
inputC := builder.iterator.IteratePoints()

for i := 0; i < builder.numOfWorker; i++ {
builder.workerWaitGroup.Add(1)
go builder.work(i, inputC, builder.aggregatorC)
}

glog.Infof("builder start number of %d workers.\n", builder.numOfWorker)
}

func (builder *connectivityMapBuilder) work(workerID int, source <-chan spatialindexer.PointInfo, sink chan<- placeIDWithNearByPlaceIDs) {
defer builder.workerWaitGroup.Done()

counter := 0
for p := range source {
counter += 1
nearbyIDs := builder.finder.FindNearByPointIDs(p.Location, builder.distanceLimit, spatialindexer.UnlimitedCount)
rankedResults := builder.ranker.RankPointIDsByGreatCircleDistance(p.Location, nearbyIDs)

ids := make([]IDAndDistance, 0, len(rankedResults))
for _, r := range rankedResults {
ids = append(ids, IDAndDistance{
ID: r.ID,
Distance: r.Distance,
})
}

sink <- placeIDWithNearByPlaceIDs{
id: p.ID,
ids: ids,
}
}

glog.Infof("Worker_%d finished handling %d tasks.\n", workerID, counter)
}

func (builder *connectivityMapBuilder) aggregate() {
builder.aggregatorWaitGroup.Add(1)

go func() {
counter := 0
for item := range builder.aggregatorC {
counter += 1
builder.id2NearbyIDs[item.id] = item.ids
}

glog.Infof("Aggregation is finished with handling %d items.\n", counter)
builder.aggregatorWaitGroup.Done()
}()
}

func (builder *connectivityMapBuilder) wait() {
builder.workerWaitGroup.Wait()
close(builder.aggregatorC)
builder.aggregatorWaitGroup.Wait()
}

type placeIDWithNearByPlaceIDs struct {
id spatialindexer.PointID
ids []IDAndDistance
}

func (builder *connectivityMapBuilder) buildInSerial() ID2NearByIDsMap {
glog.Warning("This function is only used for compare result of worker's build().\n`")
internalResult := make(chan placeIDWithNearByPlaceIDs, 10000)
m := make(ID2NearByIDsMap)

go func() {
for p := range builder.iterator.IteratePoints() {
nearbyIDs := builder.finder.FindNearByPointIDs(p.Location, builder.distanceLimit, spatialindexer.UnlimitedCount)
rankedResults := builder.ranker.RankPointIDsByGreatCircleDistance(p.Location, nearbyIDs)

ids := make([]IDAndDistance, 0, len(rankedResults))
for _, r := range rankedResults {
ids = append(ids, IDAndDistance{
ID: r.ID,
Distance: r.Distance,
})
}
internalResult <- placeIDWithNearByPlaceIDs{
id: p.ID,
ids: ids,
}
}
close(internalResult)
}()

for item := range internalResult {
m[item.id] = item.ids
}

return m
}
101 changes: 101 additions & 0 deletions integration/service/connectivitymap/builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package connectivitymap

import (
"reflect"
"runtime"
"testing"

"github.com/Telenav/osrm-backend/integration/service/spatialindexer"
"github.com/Telenav/osrm-backend/integration/service/spatialindexer/ranker"
)

// Mock env:
// Iterator returns 100 of fixed location point(IDs are 1000, 1001, 1002, ...)
// Finder returns fixed array result(10 points, IDs are 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 )
// Ranker use great circle distance to calculate
// Expect result:
// build() generate same result as pre-calculated map
// map[1000] = {results of 10 points}
// map[1001] = {results of 10 points}
// ...
// map[1099] = {results of 10 points}
func TestBuilderWithMockIteratorAndFinder(t *testing.T) {
builder := newConnectivityMapBuilder(&spatialindexer.MockOneHundredPointsIterator{},
&spatialindexer.MockFinder{},
ranker.CreateRanker(ranker.SimpleRanker, nil),
100,
runtime.NumCPU())

actual := builder.build()

// construct expect map
expect := make(ID2NearByIDsMap)

var idAndDistanceArray = []IDAndDistance{
IDAndDistance{
ID: 3,
Distance: 345.220003472554,
},
IDAndDistance{
ID: 2,
Distance: 402.8536530341791,
},
IDAndDistance{
ID: 4,
Distance: 1627.1858848458571,
},
IDAndDistance{
ID: 5,
Distance: 4615.586636153461,
},
IDAndDistance{
ID: 1,
Distance: 5257.70008125706,
},
IDAndDistance{
ID: 6,
Distance: 6888.7486674247,
},
IDAndDistance{
ID: 7,
Distance: 7041.893747628621,
},
IDAndDistance{
ID: 10,
Distance: 8622.213424347745,
},
IDAndDistance{
ID: 9,
Distance: 9438.804320070916,
},
IDAndDistance{
ID: 8,
Distance: 9897.44482638937,
},
}

for i := 0; i < 100; i++ {
index := i + 1000
expect[(spatialindexer.PointID(index))] = idAndDistanceArray
}

if !reflect.DeepEqual(actual, expect) {
t.Errorf("Failed to pass TestBuilder with mock data, \nexpect \n%+v\n but got \n%v\n", expect, actual)
}
}

func TestBuilderWithSingleWorker(t *testing.T) {
builder := newConnectivityMapBuilder(&spatialindexer.MockOneHundredPointsIterator{},
&spatialindexer.MockFinder{},
ranker.CreateRanker(ranker.SimpleRanker, nil),
100,
runtime.NumCPU())

actual := builder.build()

expect := builder.buildInSerial()

if !reflect.DeepEqual(actual, expect) {
t.Errorf("Failed to pass TestBuilder with mock data, \nexpect \n%+v\n but got \n%v\n", expect, actual)
}
}
54 changes: 54 additions & 0 deletions integration/service/connectivitymap/connectivity_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package connectivitymap

import (
"github.com/Telenav/osrm-backend/integration/service/spatialindexer"
"github.com/golang/glog"
)

// IDAndDistance wraps ID and distance information
type IDAndDistance struct {
ID spatialindexer.PointID
Distance float64
}

// ID2NearByIDsMap is a mapping between ID and its nearby IDs
type ID2NearByIDsMap map[spatialindexer.PointID][]IDAndDistance

// Connectivity Map used to query connectivity for given placeID
type ConnectivityMap struct {
id2nearByIDs ID2NearByIDsMap
maxRange float64
statistic *statistic
}

// NewConnectivityMap creates ConnectivityMap
func NewConnectivityMap(maxRange float64) *ConnectivityMap {
return &ConnectivityMap{
maxRange: maxRange,
statistic: newStatistic(),
}
}

// Build creates ConnectivityMap
func (cm *ConnectivityMap) Build() {
glog.Info("Successfully finished GenerateConnectivityMap\n")
}

// Dump dump ConnectivityMap's content to given folderPath
func (cm *ConnectivityMap) Dump(folderPath string) {
}

// Load rebuild ConnectivityMap from dumpped data in given folderPath
func (cm *ConnectivityMap) Load(folderPath string) {
}

// QueryConnectivity answers connectivity query for given placeInfo
func (cm *ConnectivityMap) QueryConnectivity(placeInfo spatialindexer.PointInfo, limitDistance float64) {
// for each everything recorded in data, apply limit option on that
}

// MaxRange tells the value used to pre-process place data.
// MaxRange means the maximum distance in meters could be reached from current location.
func (cm *ConnectivityMap) MaxRange() float64 {
return cm.maxRange
}
66 changes: 66 additions & 0 deletions integration/service/connectivitymap/connectivity_map_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package connectivitymap

var fakeID2NearByIDsMap1 = ID2NearByIDsMap{
1: []IDAndDistance{
IDAndDistance{
ID: 2,
Distance: 3,
},
IDAndDistance{
ID: 5,
Distance: 4,
},
IDAndDistance{
ID: 7,
Distance: 6,
},
IDAndDistance{
ID: 8,
Distance: 12,
},
},

2: []IDAndDistance{
IDAndDistance{
ID: 1,
Distance: 3,
},
IDAndDistance{
ID: 7,
Distance: 23,
},
},

5: []IDAndDistance{
IDAndDistance{
ID: 1,
Distance: 4,
},
IDAndDistance{
ID: 8,
Distance: 5,
},
},

7: []IDAndDistance{
IDAndDistance{
ID: 1,
Distance: 6,
},
IDAndDistance{
ID: 2,
Distance: 23,
},
},

8: []IDAndDistance{
IDAndDistance{
ID: 5,
Distance: 5,
},
IDAndDistance{
ID: 1,
Distance: 12,
},
},
}
Loading

0 comments on commit 2b28e5b

Please sign in to comment.