Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/Implement connectivity map #259

Merged
merged 29 commits into from
Apr 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8061068
feat: implement spartial index query based on google::s2
CodeBear801 Mar 24, 2020
8f9e868
fix: remove hard code value.
CodeBear801 Mar 24, 2020
459b170
fix: refactor code.
CodeBear801 Mar 25, 2020
cb08f93
fix: remove local changes.
CodeBear801 Mar 25, 2020
6de90a8
fix: update comments
CodeBear801 Mar 25, 2020
4dcf8f0
fix: add fake main to pass ci.
CodeBear801 Mar 25, 2020
240bb5b
fix: fix issue in unit test. Build()'s result no need to guarantee t…
CodeBear801 Mar 25, 2020
8270572
fix: adjust code based on review.
CodeBear801 Mar 26, 2020
705baec
feat: implement feature of place connectivity
CodeBear801 Mar 27, 2020
71f8c62
fix: rebase master into local.
CodeBear801 Mar 30, 2020
7432b79
fix: fix compilation issue.
CodeBear801 Mar 30, 2020
5625883
fix: implement logic for ranker.
CodeBear801 Mar 30, 2020
3d0d374
fix: add unit tests for simple ranker
CodeBear801 Mar 30, 2020
9f3c40d
fix: add unit tests for functions
CodeBear801 Mar 30, 2020
b6ba5c6
fix: add unit test for shortest_path_impl
CodeBear801 Mar 31, 2020
8fa567e
fix: update code based on golang-lint's suggestions.
CodeBear801 Mar 31, 2020
1abfe70
fix: adjust comments
CodeBear801 Mar 31, 2020
7c8815b
fix: remove placeconnectivitymap and will keep it for next commit.
CodeBear801 Mar 31, 2020
367138a
fix: adjust names
CodeBear801 Mar 31, 2020
05261ba
fix: update comment.
CodeBear801 Mar 31, 2020
f430853
feat: implement connectivitymap.
CodeBear801 Apr 1, 2020
7da9a68
fit: implment builder with taskspool
CodeBear801 Apr 1, 2020
f0b5886
fix: update code based on comments
CodeBear801 Apr 1, 2020
1063ad7
feat: implement connectivity map.
CodeBear801 Apr 1, 2020
bd4ba15
fix: update code based on self review
CodeBear801 Apr 1, 2020
abad5aa
fix: update comments.
CodeBear801 Apr 1, 2020
4c65cb0
fix: adjust code based on comments.
CodeBear801 Apr 2, 2020
3a9474f
fix: rafactor builder, remove unnecessary dispatcher.
CodeBear801 Apr 3, 2020
f55ea36
fix: revert back ranker's change
CodeBear801 Apr 3, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions integration/service/connectivitymap/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package connectivitymap

import (
"sync"

"github.com/Telenav/osrm-backend/integration/service/spatialindexer"
"github.com/golang/glog"
)

type connectivityMapBuilder struct {
iterator spatialindexer.PointsIterator
finder spatialindexer.Finder
ranker spatialindexer.Ranker
distanceLimit float64
id2NearbyIDs ID2NearByIDsMap

numOfWorker int
workerWaitGroup *sync.WaitGroup
aggregatorWaitGroup *sync.WaitGroup
aggregatorC chan placeIDWithNearByPlaceIDs
}

func newConnectivityMapBuilder(iterator spatialindexer.PointsIterator, finder spatialindexer.Finder,
ranker spatialindexer.Ranker, distanceLimit float64, numOfWorker int) *connectivityMapBuilder {
builder := &connectivityMapBuilder{
iterator: iterator,
finder: finder,
ranker: ranker,
distanceLimit: distanceLimit,
id2NearbyIDs: make(ID2NearByIDsMap),

numOfWorker: numOfWorker,
workerWaitGroup: &sync.WaitGroup{},
aggregatorWaitGroup: &sync.WaitGroup{},
aggregatorC: make(chan placeIDWithNearByPlaceIDs, 10000),
}

if numOfWorker < 1 {
glog.Fatal("numOfWorker should never be smaller than 1, recommend using NumCPU()\n")
}

return builder
}

/*
-> worker (fetch task -> find -> rank)
/ \
/ \
Input Iterator(channel) ---> worker (fetch task -> find -> rank) ---> aggregatorChannel -> feed to map
\ /
\ /
-> worker (fetch task -> find -> rank)
*/

func (builder *connectivityMapBuilder) build() ID2NearByIDsMap {
builder.process()
builder.aggregate()
builder.wait()

return builder.id2NearbyIDs
}

func (builder *connectivityMapBuilder) process() {
inputC := builder.iterator.IteratePoints()

for i := 0; i < builder.numOfWorker; i++ {
builder.workerWaitGroup.Add(1)
go builder.work(i, inputC, builder.aggregatorC)
}

glog.Infof("builder start number of %d workers.\n", builder.numOfWorker)
}

func (builder *connectivityMapBuilder) work(workerID int, source <-chan spatialindexer.PointInfo, sink chan<- placeIDWithNearByPlaceIDs) {
defer builder.workerWaitGroup.Done()

counter := 0
for p := range source {
counter += 1
nearbyIDs := builder.finder.FindNearByPointIDs(p.Location, builder.distanceLimit, spatialindexer.UnlimitedCount)
rankedResults := builder.ranker.RankPointIDsByGreatCircleDistance(p.Location, nearbyIDs)

ids := make([]IDAndDistance, 0, len(rankedResults))
for _, r := range rankedResults {
ids = append(ids, IDAndDistance{
ID: r.ID,
Distance: r.Distance,
})
}

sink <- placeIDWithNearByPlaceIDs{
id: p.ID,
ids: ids,
}
}

glog.Infof("Worker_%d finished handling %d tasks.\n", workerID, counter)
}

func (builder *connectivityMapBuilder) aggregate() {
builder.aggregatorWaitGroup.Add(1)

go func() {
counter := 0
for item := range builder.aggregatorC {
counter += 1
builder.id2NearbyIDs[item.id] = item.ids
}

glog.Infof("Aggregation is finished with handling %d items.\n", counter)
builder.aggregatorWaitGroup.Done()
}()
}

func (builder *connectivityMapBuilder) wait() {
builder.workerWaitGroup.Wait()
close(builder.aggregatorC)
builder.aggregatorWaitGroup.Wait()
}

type placeIDWithNearByPlaceIDs struct {
id spatialindexer.PointID
ids []IDAndDistance
}

func (builder *connectivityMapBuilder) buildInSerial() ID2NearByIDsMap {
glog.Warning("This function is only used for compare result of worker's build().\n`")
internalResult := make(chan placeIDWithNearByPlaceIDs, 10000)
m := make(ID2NearByIDsMap)

go func() {
for p := range builder.iterator.IteratePoints() {
nearbyIDs := builder.finder.FindNearByPointIDs(p.Location, builder.distanceLimit, spatialindexer.UnlimitedCount)
rankedResults := builder.ranker.RankPointIDsByGreatCircleDistance(p.Location, nearbyIDs)

ids := make([]IDAndDistance, 0, len(rankedResults))
for _, r := range rankedResults {
ids = append(ids, IDAndDistance{
ID: r.ID,
Distance: r.Distance,
})
}
internalResult <- placeIDWithNearByPlaceIDs{
id: p.ID,
ids: ids,
}
}
close(internalResult)
}()

for item := range internalResult {
m[item.id] = item.ids
}

return m
}
101 changes: 101 additions & 0 deletions integration/service/connectivitymap/builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package connectivitymap

import (
"reflect"
"runtime"
"testing"

"github.com/Telenav/osrm-backend/integration/service/spatialindexer"
"github.com/Telenav/osrm-backend/integration/service/spatialindexer/ranker"
)

// Mock env:
// Iterator returns 100 of fixed location point(IDs are 1000, 1001, 1002, ...)
// Finder returns fixed array result(10 points, IDs are 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 )
// Ranker use great circle distance to calculate
// Expect result:
// build() generate same result as pre-calculated map
// map[1000] = {results of 10 points}
// map[1001] = {results of 10 points}
// ...
// map[1099] = {results of 10 points}
func TestBuilderWithMockIteratorAndFinder(t *testing.T) {
builder := newConnectivityMapBuilder(&spatialindexer.MockOneHundredPointsIterator{},
&spatialindexer.MockFinder{},
ranker.CreateRanker(ranker.SimpleRanker, nil),
100,
runtime.NumCPU())

actual := builder.build()

// construct expect map
expect := make(ID2NearByIDsMap)

var idAndDistanceArray = []IDAndDistance{
IDAndDistance{
ID: 3,
Distance: 345.220003472554,
},
IDAndDistance{
ID: 2,
Distance: 402.8536530341791,
},
IDAndDistance{
ID: 4,
Distance: 1627.1858848458571,
},
IDAndDistance{
ID: 5,
Distance: 4615.586636153461,
},
IDAndDistance{
ID: 1,
Distance: 5257.70008125706,
},
IDAndDistance{
ID: 6,
Distance: 6888.7486674247,
},
IDAndDistance{
ID: 7,
Distance: 7041.893747628621,
},
IDAndDistance{
ID: 10,
Distance: 8622.213424347745,
},
IDAndDistance{
ID: 9,
Distance: 9438.804320070916,
},
IDAndDistance{
ID: 8,
Distance: 9897.44482638937,
},
}

for i := 0; i < 100; i++ {
index := i + 1000
expect[(spatialindexer.PointID(index))] = idAndDistanceArray
}

if !reflect.DeepEqual(actual, expect) {
t.Errorf("Failed to pass TestBuilder with mock data, \nexpect \n%+v\n but got \n%v\n", expect, actual)
}
}

func TestBuilderWithSingleWorker(t *testing.T) {
builder := newConnectivityMapBuilder(&spatialindexer.MockOneHundredPointsIterator{},
&spatialindexer.MockFinder{},
ranker.CreateRanker(ranker.SimpleRanker, nil),
100,
runtime.NumCPU())

actual := builder.build()

expect := builder.buildInSerial()

if !reflect.DeepEqual(actual, expect) {
t.Errorf("Failed to pass TestBuilder with mock data, \nexpect \n%+v\n but got \n%v\n", expect, actual)
}
}
54 changes: 54 additions & 0 deletions integration/service/connectivitymap/connectivity_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package connectivitymap

import (
CodeBear801 marked this conversation as resolved.
Show resolved Hide resolved
"github.com/Telenav/osrm-backend/integration/service/spatialindexer"
"github.com/golang/glog"
)

// IDAndDistance wraps ID and distance information
type IDAndDistance struct {
ID spatialindexer.PointID
Distance float64
}

// ID2NearByIDsMap is a mapping between ID and its nearby IDs
type ID2NearByIDsMap map[spatialindexer.PointID][]IDAndDistance

// Connectivity Map used to query connectivity for given placeID
type ConnectivityMap struct {
id2nearByIDs ID2NearByIDsMap
maxRange float64
statistic *statistic
}

// NewConnectivityMap creates ConnectivityMap
func NewConnectivityMap(maxRange float64) *ConnectivityMap {
return &ConnectivityMap{
maxRange: maxRange,
statistic: newStatistic(),
}
}

// Build creates ConnectivityMap
func (cm *ConnectivityMap) Build() {
glog.Info("Successfully finished GenerateConnectivityMap\n")
}

// Dump dump ConnectivityMap's content to given folderPath
func (cm *ConnectivityMap) Dump(folderPath string) {
}

// Load rebuild ConnectivityMap from dumpped data in given folderPath
func (cm *ConnectivityMap) Load(folderPath string) {
}

// QueryConnectivity answers connectivity query for given placeInfo
func (cm *ConnectivityMap) QueryConnectivity(placeInfo spatialindexer.PointInfo, limitDistance float64) {
// for each everything recorded in data, apply limit option on that
}

// MaxRange tells the value used to pre-process place data.
// MaxRange means the maximum distance in meters could be reached from current location.
func (cm *ConnectivityMap) MaxRange() float64 {
return cm.maxRange
}
66 changes: 66 additions & 0 deletions integration/service/connectivitymap/connectivity_map_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package connectivitymap

var fakeID2NearByIDsMap1 = ID2NearByIDsMap{
1: []IDAndDistance{
IDAndDistance{
ID: 2,
Distance: 3,
},
IDAndDistance{
ID: 5,
Distance: 4,
},
IDAndDistance{
ID: 7,
Distance: 6,
},
IDAndDistance{
ID: 8,
Distance: 12,
},
},

2: []IDAndDistance{
IDAndDistance{
ID: 1,
Distance: 3,
},
IDAndDistance{
ID: 7,
Distance: 23,
},
},

5: []IDAndDistance{
IDAndDistance{
ID: 1,
Distance: 4,
},
IDAndDistance{
ID: 8,
Distance: 5,
},
},

7: []IDAndDistance{
IDAndDistance{
ID: 1,
Distance: 6,
},
IDAndDistance{
ID: 2,
Distance: 23,
},
},

8: []IDAndDistance{
IDAndDistance{
ID: 5,
Distance: 5,
},
IDAndDistance{
ID: 1,
Distance: 12,
},
},
}
Loading