Skip to content

Commit

Permalink
feat: add segments to Flow to update traffic on part of way
Browse files Browse the repository at this point in the history
  • Loading branch information
serho committed May 4, 2020
1 parent 9812703 commit ae0438a
Show file tree
Hide file tree
Showing 11 changed files with 268 additions and 107 deletions.
2 changes: 1 addition & 1 deletion integration/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ cmd/nodes2way-cli/nodes2way-cli
cmd/trafficproxy-cli/*_flows.csv
cmd/trafficproxy-cli/*_incidents.csv
cmd/historicalspeed-timezone-builder/*mapping.csv
cmd/osrm-traffic-updater/testdata/target.csv
cmd/osrm-traffic-updater/testdata/target*.csv
10 changes: 6 additions & 4 deletions integration/cmd/osrm-traffic-updater/osrm_traffic_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,16 @@ func main() {

isFlowDoneChan := make(chan bool, 1)
wayid2speed := make(map[int64]int)
segmentsOfWay := make(map[int64][]*trafficproxy.SegmentedFlow)

go func() {
trafficData, err := trafficproxyclient.GetFlowsIncidents(nil)
if err != nil {
log.Println(err)
isFlowDoneChan <- false
return
}

trafficData2map(*trafficData, wayid2speed)
trafficData2map(*trafficData, wayid2speed, segmentsOfWay)
isFlowDoneChan <- true
}()

Expand All @@ -60,7 +61,7 @@ func main() {
if isFlowDone {
var ds dumperStatistic
ds.Init(TASKNUM)
dumpSpeedTable4Customize(wayid2speed, sources, flags.csvFile, &ds)
dumpSpeedTable4Customize(wayid2speed, segmentsOfWay, sources, flags.csvFile, &ds)
ds.Output()
}
}
Expand All @@ -83,7 +84,7 @@ loop:
return isFlowDone
}

func trafficData2map(trafficData trafficproxy.TrafficResponse, m map[int64]int) {
func trafficData2map(trafficData trafficproxy.TrafficResponse, m map[int64]int, s map[int64][]*trafficproxy.SegmentedFlow) {
startTime := time.Now()
defer func() {
log.Printf("Processing time for building traffic map takes %f seconds\n", time.Now().Sub(startTime).Seconds())
Expand All @@ -102,6 +103,7 @@ func trafficData2map(trafficData trafficproxy.TrafficResponse, m map[int64]int)

wayid := flow.Flow.WayID
m[wayid] = int(flow.Flow.Speed)
s[wayid] = flow.Flow.SegmentedFlow

if wayid > 0 {
fwdCnt++
Expand Down
46 changes: 38 additions & 8 deletions integration/cmd/osrm-traffic-updater/speed_table_dumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@ import (
"bufio"
"fmt"
"log"
"math"
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/Telenav/osrm-backend/integration/traffic/livetraffic/trafficproxy"
)

var tasksWg sync.WaitGroup
var dumpFinishedWg sync.WaitGroup

func dumpSpeedTable4Customize(wayid2speed map[int64]int, sources [TASKNUM]chan string,
func dumpSpeedTable4Customize(wayid2speed map[int64]int, segmentsOfWay map[int64][]*trafficproxy.SegmentedFlow, sources [TASKNUM]chan string,
outputPath string, ds *dumperStatistic) {
startTime := time.Now()

Expand All @@ -23,19 +26,19 @@ func dumpSpeedTable4Customize(wayid2speed map[int64]int, sources [TASKNUM]chan s
}

sink := make(chan string)
startTasks(wayid2speed, sources, sink, ds)
startTasks(wayid2speed, segmentsOfWay, sources, sink, ds)
startDump(outputPath, sink)
wait4AllTasksFinished(sink, ds)

endTime := time.Now()
fmt.Printf("Processing time for dumpSpeedTable4Customize takes %f seconds\n", endTime.Sub(startTime).Seconds())
}

func startTasks(wayid2speed map[int64]int, sources [TASKNUM]chan string,
func startTasks(wayid2speed map[int64]int, segmentsOfWay map[int64][]*trafficproxy.SegmentedFlow, sources [TASKNUM]chan string,
sink chan<- string, ds *dumperStatistic) {
tasksWg.Add(TASKNUM)
for i := 0; i < TASKNUM; i++ {
go task(wayid2speed, sources[i], sink, ds)
go task(wayid2speed, segmentsOfWay, sources[i], sink, ds)
}
}

Expand All @@ -51,13 +54,15 @@ func wait4AllTasksFinished(sink chan string, ds *dumperStatistic) {
dumpFinishedWg.Wait()
}

func task(wayid2speed map[int64]int, source <-chan string, sink chan<- string, ds *dumperStatistic) {
func task(wayid2speed map[int64]int, segmentsOfWay map[int64][]*trafficproxy.SegmentedFlow, source <-chan string, sink chan<- string, ds *dumperStatistic) {
var wayCnt, nodeCnt, fwdRecordCnt, bwdRecordCnt, wayMatched, nodeMatched, fwdTrafficMatched, bwdTrafficMatched uint64
var err error
for str := range source {
elements := strings.Split(str, ",")
wayCnt += 1
nodeCnt += (uint64)(len(elements) - 1)
nodesInWayCnt := (uint64)(len(elements) - 1)
nodeCnt += nodesInWayCnt

if len(elements) < 3 {
continue
}
Expand All @@ -71,6 +76,20 @@ func task(wayid2speed map[int64]int, source <-chan string, sink chan<- string, d
speedFwd, okFwd := wayid2speed[(int64)(wayid)]
speedBwd, okBwd := wayid2speed[(int64)(-wayid)]

speedsFwd := make([]int, nodesInWayCnt)
speedsBwd := make([]int, nodesInWayCnt)

for i := range elements[1:] {
speedsFwd[i] = speedFwd
speedsBwd[i] = speedBwd
}

segmentsFwd := segmentsOfWay[(int64)(wayid)]
segmentsBwd := segmentsOfWay[(int64)(-wayid)]

getSpeedOfSegments(segmentsFwd, speedsFwd, nodesInWayCnt)
getSpeedOfSegments(segmentsBwd, speedsBwd, nodesInWayCnt)

if okFwd || okBwd {
var nodes []string = elements[1:]
wayMatched += 1
Expand All @@ -94,11 +113,11 @@ func task(wayid2speed map[int64]int, source <-chan string, sink chan<- string, d
}
if okFwd {
fwdRecordCnt += 1
sink <- generateSingleRecord(n1, n2, speedFwd, true)
sink <- generateSingleRecord(n1, n2, speedsFwd[i], true)
}
if okBwd {
bwdRecordCnt += 1
sink <- generateSingleRecord(n1, n2, speedBwd, false)
sink <- generateSingleRecord(n1, n2, speedsBwd[i], false)
}

}
Expand All @@ -109,6 +128,17 @@ func task(wayid2speed map[int64]int, source <-chan string, sink chan<- string, d
tasksWg.Done()
}

func getSpeedOfSegments(segments []*trafficproxy.SegmentedFlow, speeds []int, nodesCnt uint64) {
for _, segment := range segments {
indexOfBegin := int(math.Floor(float64(nodesCnt) * float64(segment.Begin) / 100))
indexOfEnd := int(math.Floor(float64(nodesCnt) * float64(segment.End) / 100))

for i := indexOfBegin; i < indexOfEnd; i++ {
speeds[i] = int(segment.Speed)
}
}
}

// format
// if dir = true, means traffic for forward, generate: from, to, speed
// if dir = false, means this speed is for backward flow, generate: to, from, speed
Expand Down
34 changes: 33 additions & 1 deletion integration/cmd/osrm-traffic-updater/speed_table_dumper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"strconv"
"strings"
"testing"

"github.com/Telenav/osrm-backend/integration/traffic/livetraffic/trafficproxy"
)

func TestSpeedTableDumper1(t *testing.T) {
Expand All @@ -23,16 +25,41 @@ func TestSpeedTableDumper1(t *testing.T) {

// construct mock traffic
wayid2speed := make(map[int64]int)
segmentsOfWay := make(map[int64][]*trafficproxy.SegmentedFlow)
loadMockTrafficFlow2Map(wayid2speed)

var ds dumperStatistic
ds.Init(TASKNUM)
dumpSpeedTable4Customize(wayid2speed, sources, "./testdata/target.csv", &ds)
dumpSpeedTable4Customize(wayid2speed, segmentsOfWay, sources, "./testdata/target.csv", &ds)

compareFileContentUnstable("./testdata/target.csv", "./testdata/expect.csv", t)
validateStatistic(&ds, t)
}

func TestSpeedTableDumper2(t *testing.T) {
// load result into sources
var sources [TASKNUM]chan string
for i := range sources {
sources[i] = make(chan string, 10000)
}
go loadWay2NodeidsTable("./testdata/id-mapping-segment.csv.snappy", sources)

// construct mock traffic
wayid2speed := make(map[int64]int)
wayid2speed[733690162] = 60
wayid2speed[-733689924] = 60

segmentsOfWay := make(map[int64][]*trafficproxy.SegmentedFlow)
loadMockTrafficFlowSegment2Map(segmentsOfWay)

var ds dumperStatistic
ds.Init(TASKNUM)
dumpSpeedTable4Customize(wayid2speed, segmentsOfWay, sources, "./testdata/target-segment.csv", &ds)

compareFileContentUnstable("./testdata/target-segment.csv", "./testdata/expect-segment.csv", t)
// validateStatistic(&ds, t)
}

func TestGenerateSingleRecord1(t *testing.T) {
str := generateSingleRecord(12345, 54321, 33, true)
if strings.Compare(str, "12345,54321,33\n") != 0 {
Expand Down Expand Up @@ -62,6 +89,11 @@ func loadMockTrafficFlow2Map(wayid2speed map[int64]int) {
wayid2speed[-24418344] = 59
}

func loadMockTrafficFlowSegment2Map(segmentsOfWay map[int64][]*trafficproxy.SegmentedFlow) {
segmentsOfWay[733690162] = []*trafficproxy.SegmentedFlow{{Speed: 25, Begin: 25, End: 75}}
segmentsOfWay[-733689924] = []*trafficproxy.SegmentedFlow{{Speed: 25, Begin: 10, End: 50}}
}

type tNodePair struct {
f, t uint64
}
Expand Down
11 changes: 11 additions & 0 deletions integration/cmd/osrm-traffic-updater/testdata/expect-segment.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
1253042677,6871726226,25
6871775001,1253042677,25
6871775003,6871775001,60
6871726248,6871775003,60
6871726238,6871744979,60
6871744979,6871744978,60
6871744978,6871744977,25
6871744977,6871744976,25
6871744976,6871744975,25
6871744975,6871744974,25
6871744974,6871744973,60
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
733690162,6871726238,6871744979,6871744978,6871744977,6871744976,6871744975,6871744974,6871744973
733689924,6871726226,1253042677,6871775001,6871775003,6871726248
Binary file not shown.
12 changes: 6 additions & 6 deletions integration/traffic/livetraffic/trafficproxy/csv_string_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ func TestFlowCSVString(t *testing.T) {
humanFriendlyCSVString string
}{
{
Flow{WayID: 829733412, Speed: 20.280001, TrafficLevel: TrafficLevel_FREE_FLOW, Timestamp: 1579419488000},
"829733412,20.280001,7,1579419488000",
"829733412,20.280001,FREE_FLOW,1579419488000",
Flow{WayID: 829733412, Speed: 20.280001, TrafficLevel: TrafficLevel_FREE_FLOW, Timestamp: 1579419488000, SegmentedFlow: []*SegmentedFlow{{Speed: 25, Begin: 25, End: 75}}},
"829733412,20.280001,7,1579419488000,[speed:25 begin:25 end:75 ]",
"829733412,20.280001,FREE_FLOW,1579419488000,[speed:25 begin:25 end:75 ]",
},
{
Flow{WayID: -129639168, Speed: 31.389999, TrafficLevel: TrafficLevel_FREE_FLOW, Timestamp: 1579419488000},
"-129639168,31.389999,7,1579419488000",
"-129639168,31.389999,FREE_FLOW,1579419488000",
Flow{WayID: -129639168, Speed: 31.389999, TrafficLevel: TrafficLevel_FREE_FLOW, Timestamp: 1579419488000, SegmentedFlow: []*SegmentedFlow{}},
"-129639168,31.389999,7,1579419488000,[]",
"-129639168,31.389999,FREE_FLOW,1579419488000,[]",
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ func (f *Flow) IsBlocking() bool {
// CSVString represents Flow as defined CSV format.
// I.e. 'wayID,Speed,TrafficLevel,Timestamp'
func (f *Flow) CSVString() string {
return fmt.Sprintf("%d,%f,%d,%d", f.WayID, f.Speed, f.TrafficLevel, f.Timestamp)
return fmt.Sprintf("%d,%f,%d,%d,%v", f.WayID, f.Speed, f.TrafficLevel, f.Timestamp, f.SegmentedFlow)
}

// HumanFriendlyCSVString represents Flow as defined CSV format, but prefer human friendly string instead of integer.
// I.e. 'wayID,Speed,TrafficLevel,Timestamp'
func (f *Flow) HumanFriendlyCSVString() string {
return fmt.Sprintf("%d,%f,%s,%d", f.WayID, f.Speed, f.TrafficLevel, f.Timestamp)
return fmt.Sprintf("%d,%f,%s,%d,%v", f.WayID, f.Speed, f.TrafficLevel, f.Timestamp, f.SegmentedFlow)
}
Loading

0 comments on commit ae0438a

Please sign in to comment.