diff --git a/integration/cmd/osrm-traffic-updater/osrm_traffic_updater.go b/integration/cmd/osrm-traffic-updater/osrm_traffic_updater.go index 0b33d2ba6b0..c8fabb08b14 100644 --- a/integration/cmd/osrm-traffic-updater/osrm_traffic_updater.go +++ b/integration/cmd/osrm-traffic-updater/osrm_traffic_updater.go @@ -6,7 +6,7 @@ import ( "log" "time" - proxy "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" + "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" "github.com/Telenav/osrm-backend/integration/pkg/trafficproxyclient" "github.com/golang/glog" ) @@ -83,7 +83,7 @@ loop: return isFlowDone } -func trafficData2map(trafficData proxy.TrafficResponse, m map[int64]int) { +func trafficData2map(trafficData trafficproxy.TrafficResponse, m map[int64]int) { startTime := time.Now() defer func() { log.Printf("Processing time for building traffic map takes %f seconds\n", time.Now().Sub(startTime).Seconds()) @@ -100,7 +100,7 @@ func trafficData2map(trafficData proxy.TrafficResponse, m map[int64]int) { } } - wayid := flow.Flow.WayId + wayid := flow.Flow.WayID m[wayid] = int(flow.Flow.Speed) if wayid > 0 { @@ -114,9 +114,9 @@ func trafficData2map(trafficData proxy.TrafficResponse, m map[int64]int) { for _, incident := range trafficData.IncidentResponses { if incident.Incident.IsBlocking { // only use blocking incidents blockingIncidentCnt++ - blockingIncidentAffectedWaysCnt += int64(len(incident.Incident.AffectedWayIds)) + blockingIncidentAffectedWaysCnt += int64(len(incident.Incident.AffectedWayIDs)) - for _, wayid := range incident.Incident.AffectedWayIds { + for _, wayid := range incident.Incident.AffectedWayIDs { m[wayid] = 0 if wayid > 0 { diff --git a/integration/cmd/trafficproxy-cli/main.go b/integration/cmd/trafficproxy-cli/main.go index e31545cd05d..124b2f77863 100644 --- a/integration/cmd/trafficproxy-cli/main.go +++ b/integration/cmd/trafficproxy-cli/main.go @@ -3,7 +3,7 @@ package main import ( "flag" - proxy "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" + "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" "github.com/Telenav/osrm-backend/integration/pkg/trafficproxyclient" "github.com/Telenav/osrm-backend/integration/trafficdumper" "github.com/golang/glog" @@ -51,7 +51,7 @@ func main() { return } else if flags.rpcMode == rpcModeStreamingDelta { - responseChan := make(chan proxy.TrafficResponse) + responseChan := make(chan trafficproxy.TrafficResponse) waitChan := make(chan struct{}) // async startup dumper diff --git a/integration/pkg/trafficeater/interface.go b/integration/pkg/trafficeater/interface.go index 41cc2bbfad1..45cd8e26d38 100644 --- a/integration/pkg/trafficeater/interface.go +++ b/integration/pkg/trafficeater/interface.go @@ -1,10 +1,10 @@ package trafficeater -import proxy "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" +import "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" // Eater is the interface that wraps the basic Eat method. type Eater interface { // Eat consumes traffic responses. - Eat(proxy.TrafficResponse) + Eat(trafficproxy.TrafficResponse) } diff --git a/integration/pkg/trafficproxy/csv_string_test.go b/integration/pkg/trafficproxy/csv_string_test.go index 06f4333d703..4a4f74d4cfa 100644 --- a/integration/pkg/trafficproxy/csv_string_test.go +++ b/integration/pkg/trafficproxy/csv_string_test.go @@ -1,4 +1,4 @@ -package proxy +package trafficproxy import "testing" @@ -10,14 +10,14 @@ func TestFlowCSVString(t *testing.T) { humanFriendlyCSVString string }{ { - Flow{WayId: 829733412, Speed: 20.280001, TrafficLevel: TrafficLevel_FREE_FLOW}, - "829733412,20.280001,7", - "829733412,20.280001,FREE_FLOW", + Flow{WayID: 829733412, Speed: 20.280001, TrafficLevel: TrafficLevel_FREE_FLOW, Timestamp: 1579419488000}, + "829733412,20.280001,7,1579419488000", + "829733412,20.280001,FREE_FLOW,1579419488000", }, { - Flow{WayId: -129639168, Speed: 31.389999, TrafficLevel: TrafficLevel_FREE_FLOW}, - "-129639168,31.389999,7", - "-129639168,31.389999,FREE_FLOW", + Flow{WayID: -129639168, Speed: 31.389999, TrafficLevel: TrafficLevel_FREE_FLOW, Timestamp: 1579419488000}, + "-129639168,31.389999,7,1579419488000", + "-129639168,31.389999,FREE_FLOW,1579419488000", }, } @@ -44,8 +44,8 @@ func TestIncidentCSVString(t *testing.T) { }{ { Incident{ - IncidentId: "TTI-f47b8dba-59a3-372d-9cec-549eb252e2d5-TTR46312939215361-1", - AffectedWayIds: []int64{100663296, -1204020275, 100663296, -1204020274, 100663296, -916744017, 100663296, -1204020245, 100663296, -1194204646, 100663296, -1204394608, 100663296, -1194204647, 100663296, -129639168, 100663296, -1194204645}, + IncidentID: "TTI-f47b8dba-59a3-372d-9cec-549eb252e2d5-TTR46312939215361-1", + AffectedWayIDs: []int64{100663296, -1204020275, 100663296, -1204020274, 100663296, -916744017, 100663296, -1204020245, 100663296, -1194204646, 100663296, -1204394608, 100663296, -1194204647, 100663296, -129639168, 100663296, -1194204645}, IncidentType: IncidentType_MISCELLANEOUS, IncidentSeverity: IncidentSeverity_CRITICAL, IncidentLocation: &Location{Lat: 44.181220, Lon: -117.135840}, @@ -56,9 +56,10 @@ func TestIncidentCSVString(t *testing.T) { EventCode: 500, AlertCEventQuantifier: 0, IsBlocking: false, + Timestamp: 1579419488000, }, - "TTI-f47b8dba-59a3-372d-9cec-549eb252e2d5-TTR46312939215361-1,\"100663296,-1204020275,100663296,-1204020274,100663296,-916744017,100663296,-1204020245,100663296,-1194204646,100663296,-1204394608,100663296,-1194204647,100663296,-129639168,100663296,-1194204645\",5,1,44.181220,-117.135840,\"Construction on I-84 EB near MP 359, Drive with caution.\",,,I-84 E,500,0,0", - "TTI-f47b8dba-59a3-372d-9cec-549eb252e2d5-TTR46312939215361-1,\"100663296,-1204020275,100663296,-1204020274,100663296,-916744017,100663296,-1204020245,100663296,-1194204646,100663296,-1204394608,100663296,-1194204647,100663296,-129639168,100663296,-1194204645\",MISCELLANEOUS,CRITICAL,44.181220,-117.135840,\"Construction on I-84 EB near MP 359, Drive with caution.\",,,I-84 E,500,0,false", + "TTI-f47b8dba-59a3-372d-9cec-549eb252e2d5-TTR46312939215361-1,\"100663296,-1204020275,100663296,-1204020274,100663296,-916744017,100663296,-1204020245,100663296,-1194204646,100663296,-1204394608,100663296,-1194204647,100663296,-129639168,100663296,-1194204645\",5,1,44.181220,-117.135840,\"Construction on I-84 EB near MP 359, Drive with caution.\",,,I-84 E,500,0,0,1579419488000", + "TTI-f47b8dba-59a3-372d-9cec-549eb252e2d5-TTR46312939215361-1,\"100663296,-1204020275,100663296,-1204020274,100663296,-916744017,100663296,-1204020245,100663296,-1194204646,100663296,-1204394608,100663296,-1194204647,100663296,-129639168,100663296,-1194204645\",MISCELLANEOUS,CRITICAL,44.181220,-117.135840,\"Construction on I-84 EB near MP 359, Drive with caution.\",,,I-84 E,500,0,false,1579419488000", }, } diff --git a/integration/pkg/trafficproxy/flow_extension.go b/integration/pkg/trafficproxy/flow_extension.go index 89bf6ca1c25..16715bb7a7c 100644 --- a/integration/pkg/trafficproxy/flow_extension.go +++ b/integration/pkg/trafficproxy/flow_extension.go @@ -1,24 +1,22 @@ -package proxy +package trafficproxy import "fmt" -const blockingSpeedThreshold = 1 // Think it's blocking if flow speed smaller than this threshold. - // IsBlocking tests whether the Flow is blocking or not. // This function extends protoc-gen-go generated code on testing whether is blocking for Flow. func (f *Flow) IsBlocking() bool { - return f.TrafficLevel == TrafficLevel_CLOSED || f.Speed < blockingSpeedThreshold + return f.TrafficLevel == TrafficLevel_CLOSED } // CSVString represents Flow as defined CSV format. -// I.e. 'wayID,Speed,TrafficLevel' +// I.e. 'wayID,Speed,TrafficLevel,Timestamp' func (f *Flow) CSVString() string { - return fmt.Sprintf("%d,%f,%d", f.WayId, f.Speed, f.TrafficLevel) + return fmt.Sprintf("%d,%f,%d,%d", f.WayID, f.Speed, f.TrafficLevel, f.Timestamp) } // HumanFriendlyCSVString represents Flow as defined CSV format, but prefer human friendly string instead of integer. -// I.e. 'wayID,Speed,TrafficLevel' +// I.e. 'wayID,Speed,TrafficLevel,Timestamp' func (f *Flow) HumanFriendlyCSVString() string { - return fmt.Sprintf("%d,%f,%s", f.WayId, f.Speed, f.TrafficLevel) + return fmt.Sprintf("%d,%f,%s,%d", f.WayID, f.Speed, f.TrafficLevel, f.Timestamp) } diff --git a/integration/pkg/trafficproxy/incident_extension.go b/integration/pkg/trafficproxy/incident_extension.go index 26e14e0776e..ca5da9f6c56 100644 --- a/integration/pkg/trafficproxy/incident_extension.go +++ b/integration/pkg/trafficproxy/incident_extension.go @@ -1,4 +1,4 @@ -package proxy +package trafficproxy import ( "bytes" @@ -20,10 +20,10 @@ func (i *Incident) HumanFriendlyCSVString() string { func (i *Incident) csvString(humanFriendly bool) string { records := []string{} - records = append(records, i.IncidentId) + records = append(records, i.IncidentID) affectedWayIDsString := []string{} - for _, wayID := range i.AffectedWayIds { + for _, wayID := range i.AffectedWayIDs { affectedWayIDsString = append(affectedWayIDsString, strconv.FormatInt(wayID, 10)) } records = append(records, strings.Join(affectedWayIDsString, ",")) @@ -53,6 +53,8 @@ func (i *Incident) csvString(humanFriendly bool) string { records = append(records, strconv.Itoa(isBlockingInteger)) } + records = append(records, strconv.Itoa(int(i.Timestamp))) + var buff bytes.Buffer w := csv.NewWriter(&buff) w.UseCRLF = false diff --git a/integration/pkg/trafficproxy/proxy.pb.go b/integration/pkg/trafficproxy/trafficproxy.pb.go similarity index 66% rename from integration/pkg/trafficproxy/proxy.pb.go rename to integration/pkg/trafficproxy/trafficproxy.pb.go index 7f41e0c91e8..13f01dadced 100644 --- a/integration/pkg/trafficproxy/proxy.pb.go +++ b/integration/pkg/trafficproxy/trafficproxy.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. -// source: proxy.proto +// source: trafficproxy.proto -package proxy +package trafficproxy import ( context "context" @@ -46,7 +46,7 @@ func (x TrafficType) String() string { } func (TrafficType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_700b50b08ed8dbaf, []int{0} + return fileDescriptor_9c857d02e32d0eb6, []int{0} } type TrafficLevel int32 @@ -83,27 +83,24 @@ func (x TrafficLevel) String() string { } func (TrafficLevel) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_700b50b08ed8dbaf, []int{1} + return fileDescriptor_9c857d02e32d0eb6, []int{1} } type Action int32 const ( - Action_ADD Action = 0 - Action_UPDATE Action = 1 - Action_DELETE Action = 2 + Action_UPDATE Action = 0 + Action_DELETE Action = 1 ) var Action_name = map[int32]string{ - 0: "ADD", - 1: "UPDATE", - 2: "DELETE", + 0: "UPDATE", + 1: "DELETE", } var Action_value = map[string]int32{ - "ADD": 0, - "UPDATE": 1, - "DELETE": 2, + "UPDATE": 0, + "DELETE": 1, } func (x Action) String() string { @@ -111,7 +108,7 @@ func (x Action) String() string { } func (Action) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_700b50b08ed8dbaf, []int{2} + return fileDescriptor_9c857d02e32d0eb6, []int{2} } type IncidentType int32 @@ -169,7 +166,7 @@ func (x IncidentType) String() string { } func (IncidentType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_700b50b08ed8dbaf, []int{3} + return fileDescriptor_9c857d02e32d0eb6, []int{3} } type IncidentSeverity int32 @@ -203,27 +200,31 @@ func (x IncidentSeverity) String() string { } func (IncidentSeverity) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_700b50b08ed8dbaf, []int{4} + return fileDescriptor_9c857d02e32d0eb6, []int{4} } type TrafficRequest struct { TrafficSource *TrafficSource `protobuf:"bytes,1,opt,name=trafficSource,proto3" json:"trafficSource,omitempty"` - TrafficType []TrafficType `protobuf:"varint,2,rep,packed,name=trafficType,proto3,enum=proxy.TrafficType" json:"trafficType,omitempty"` + TrafficType []TrafficType `protobuf:"varint,2,rep,packed,name=trafficType,proto3,enum=trafficproxy.TrafficType" json:"trafficType,omitempty"` // Types that are valid to be assigned to RequestOneof: // *TrafficRequest_TrafficAllRequest - // *TrafficRequest_TrafficWayIdsRequest + // *TrafficRequest_TrafficWayIDsRequest // *TrafficRequest_TrafficStreamingDeltaRequest - RequestOneof isTrafficRequest_RequestOneof `protobuf_oneof:"request_oneof"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + RequestOneof isTrafficRequest_RequestOneof `protobuf_oneof:"request_oneof"` + // timestamp of expected traffic data, number of milliseconds since the Epoch. + // 0 means request for current(live) traffic data, otherwise request for archived traffic data. + // For archived traffic data, only allows trafficAllRequest and trafficWayIDsRequest. + Timestamp int64 `protobuf:"varint,6,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *TrafficRequest) Reset() { *m = TrafficRequest{} } func (m *TrafficRequest) String() string { return proto.CompactTextString(m) } func (*TrafficRequest) ProtoMessage() {} func (*TrafficRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_700b50b08ed8dbaf, []int{0} + return fileDescriptor_9c857d02e32d0eb6, []int{0} } func (m *TrafficRequest) XXX_Unmarshal(b []byte) error { @@ -266,8 +267,8 @@ type TrafficRequest_TrafficAllRequest struct { TrafficAllRequest *TrafficAllRequest `protobuf:"bytes,3,opt,name=trafficAllRequest,proto3,oneof"` } -type TrafficRequest_TrafficWayIdsRequest struct { - TrafficWayIdsRequest *TrafficWayIdsRequest `protobuf:"bytes,4,opt,name=trafficWayIdsRequest,proto3,oneof"` +type TrafficRequest_TrafficWayIDsRequest struct { + TrafficWayIDsRequest *TrafficWayIDsRequest `protobuf:"bytes,4,opt,name=trafficWayIDsRequest,proto3,oneof"` } type TrafficRequest_TrafficStreamingDeltaRequest struct { @@ -276,7 +277,7 @@ type TrafficRequest_TrafficStreamingDeltaRequest struct { func (*TrafficRequest_TrafficAllRequest) isTrafficRequest_RequestOneof() {} -func (*TrafficRequest_TrafficWayIdsRequest) isTrafficRequest_RequestOneof() {} +func (*TrafficRequest_TrafficWayIDsRequest) isTrafficRequest_RequestOneof() {} func (*TrafficRequest_TrafficStreamingDeltaRequest) isTrafficRequest_RequestOneof() {} @@ -294,9 +295,9 @@ func (m *TrafficRequest) GetTrafficAllRequest() *TrafficAllRequest { return nil } -func (m *TrafficRequest) GetTrafficWayIdsRequest() *TrafficWayIdsRequest { - if x, ok := m.GetRequestOneof().(*TrafficRequest_TrafficWayIdsRequest); ok { - return x.TrafficWayIdsRequest +func (m *TrafficRequest) GetTrafficWayIDsRequest() *TrafficWayIDsRequest { + if x, ok := m.GetRequestOneof().(*TrafficRequest_TrafficWayIDsRequest); ok { + return x.TrafficWayIDsRequest } return nil } @@ -308,11 +309,18 @@ func (m *TrafficRequest) GetTrafficStreamingDeltaRequest() *TrafficStreamingDelt return nil } +func (m *TrafficRequest) GetTimestamp() int64 { + if m != nil { + return m.Timestamp + } + return 0 +} + // XXX_OneofWrappers is for the internal use of the proto package. func (*TrafficRequest) XXX_OneofWrappers() []interface{} { return []interface{}{ (*TrafficRequest_TrafficAllRequest)(nil), - (*TrafficRequest_TrafficWayIdsRequest)(nil), + (*TrafficRequest_TrafficWayIDsRequest)(nil), (*TrafficRequest_TrafficStreamingDeltaRequest)(nil), } } @@ -327,7 +335,7 @@ func (m *TrafficAllRequest) Reset() { *m = TrafficAllRequest{} } func (m *TrafficAllRequest) String() string { return proto.CompactTextString(m) } func (*TrafficAllRequest) ProtoMessage() {} func (*TrafficAllRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_700b50b08ed8dbaf, []int{1} + return fileDescriptor_9c857d02e32d0eb6, []int{1} } func (m *TrafficAllRequest) XXX_Unmarshal(b []byte) error { @@ -348,41 +356,41 @@ func (m *TrafficAllRequest) XXX_DiscardUnknown() { var xxx_messageInfo_TrafficAllRequest proto.InternalMessageInfo -type TrafficWayIdsRequest struct { - WayIds []int64 `protobuf:"zigzag64,1,rep,packed,name=wayIds,proto3" json:"wayIds,omitempty"` +type TrafficWayIDsRequest struct { + WayIDs []int64 `protobuf:"zigzag64,1,rep,packed,name=wayIDs,proto3" json:"wayIDs,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } -func (m *TrafficWayIdsRequest) Reset() { *m = TrafficWayIdsRequest{} } -func (m *TrafficWayIdsRequest) String() string { return proto.CompactTextString(m) } -func (*TrafficWayIdsRequest) ProtoMessage() {} -func (*TrafficWayIdsRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_700b50b08ed8dbaf, []int{2} +func (m *TrafficWayIDsRequest) Reset() { *m = TrafficWayIDsRequest{} } +func (m *TrafficWayIDsRequest) String() string { return proto.CompactTextString(m) } +func (*TrafficWayIDsRequest) ProtoMessage() {} +func (*TrafficWayIDsRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_9c857d02e32d0eb6, []int{2} } -func (m *TrafficWayIdsRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_TrafficWayIdsRequest.Unmarshal(m, b) +func (m *TrafficWayIDsRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_TrafficWayIDsRequest.Unmarshal(m, b) } -func (m *TrafficWayIdsRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_TrafficWayIdsRequest.Marshal(b, m, deterministic) +func (m *TrafficWayIDsRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_TrafficWayIDsRequest.Marshal(b, m, deterministic) } -func (m *TrafficWayIdsRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_TrafficWayIdsRequest.Merge(m, src) +func (m *TrafficWayIDsRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_TrafficWayIDsRequest.Merge(m, src) } -func (m *TrafficWayIdsRequest) XXX_Size() int { - return xxx_messageInfo_TrafficWayIdsRequest.Size(m) +func (m *TrafficWayIDsRequest) XXX_Size() int { + return xxx_messageInfo_TrafficWayIDsRequest.Size(m) } -func (m *TrafficWayIdsRequest) XXX_DiscardUnknown() { - xxx_messageInfo_TrafficWayIdsRequest.DiscardUnknown(m) +func (m *TrafficWayIDsRequest) XXX_DiscardUnknown() { + xxx_messageInfo_TrafficWayIDsRequest.DiscardUnknown(m) } -var xxx_messageInfo_TrafficWayIdsRequest proto.InternalMessageInfo +var xxx_messageInfo_TrafficWayIDsRequest proto.InternalMessageInfo -func (m *TrafficWayIdsRequest) GetWayIds() []int64 { +func (m *TrafficWayIDsRequest) GetWayIDs() []int64 { if m != nil { - return m.WayIds + return m.WayIDs } return nil } @@ -398,7 +406,7 @@ func (m *TrafficStreamingDeltaRequest) Reset() { *m = TrafficStreamingDe func (m *TrafficStreamingDeltaRequest) String() string { return proto.CompactTextString(m) } func (*TrafficStreamingDeltaRequest) ProtoMessage() {} func (*TrafficStreamingDeltaRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_700b50b08ed8dbaf, []int{3} + return fileDescriptor_9c857d02e32d0eb6, []int{3} } func (m *TrafficStreamingDeltaRequest) XXX_Unmarshal(b []byte) error { @@ -443,7 +451,7 @@ func (m *TrafficStreamingDeltaRequest_StreamingRule) String() string { } func (*TrafficStreamingDeltaRequest_StreamingRule) ProtoMessage() {} func (*TrafficStreamingDeltaRequest_StreamingRule) Descriptor() ([]byte, []int) { - return fileDescriptor_700b50b08ed8dbaf, []int{3, 0} + return fileDescriptor_9c857d02e32d0eb6, []int{3, 0} } func (m *TrafficStreamingDeltaRequest_StreamingRule) XXX_Unmarshal(b []byte) error { @@ -490,7 +498,7 @@ func (m *TrafficResponse) Reset() { *m = TrafficResponse{} } func (m *TrafficResponse) String() string { return proto.CompactTextString(m) } func (*TrafficResponse) ProtoMessage() {} func (*TrafficResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_700b50b08ed8dbaf, []int{4} + return fileDescriptor_9c857d02e32d0eb6, []int{4} } func (m *TrafficResponse) XXX_Unmarshal(b []byte) error { @@ -529,6 +537,7 @@ type TrafficSource struct { Region string `protobuf:"bytes,1,opt,name=region,proto3" json:"region,omitempty"` TrafficProvider string `protobuf:"bytes,2,opt,name=trafficProvider,proto3" json:"trafficProvider,omitempty"` MapProvider string `protobuf:"bytes,3,opt,name=mapProvider,proto3" json:"mapProvider,omitempty"` + Subregion []string `protobuf:"bytes,4,rep,name=subregion,proto3" json:"subregion,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -538,7 +547,7 @@ func (m *TrafficSource) Reset() { *m = TrafficSource{} } func (m *TrafficSource) String() string { return proto.CompactTextString(m) } func (*TrafficSource) ProtoMessage() {} func (*TrafficSource) Descriptor() ([]byte, []int) { - return fileDescriptor_700b50b08ed8dbaf, []int{5} + return fileDescriptor_9c857d02e32d0eb6, []int{5} } func (m *TrafficSource) XXX_Unmarshal(b []byte) error { @@ -580,9 +589,17 @@ func (m *TrafficSource) GetMapProvider() string { return "" } +func (m *TrafficSource) GetSubregion() []string { + if m != nil { + return m.Subregion + } + return nil +} + type FlowResponse struct { Flow *Flow `protobuf:"bytes,1,opt,name=flow,proto3" json:"flow,omitempty"` - Action Action `protobuf:"varint,2,opt,name=action,proto3,enum=proxy.Action" json:"action,omitempty"` + Action Action `protobuf:"varint,2,opt,name=action,proto3,enum=trafficproxy.Action" json:"action,omitempty"` + TrasctionID string `protobuf:"bytes,3,opt,name=trasctionID,proto3" json:"trasctionID,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -592,7 +609,7 @@ func (m *FlowResponse) Reset() { *m = FlowResponse{} } func (m *FlowResponse) String() string { return proto.CompactTextString(m) } func (*FlowResponse) ProtoMessage() {} func (*FlowResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_700b50b08ed8dbaf, []int{6} + return fileDescriptor_9c857d02e32d0eb6, []int{6} } func (m *FlowResponse) XXX_Unmarshal(b []byte) error { @@ -624,13 +641,21 @@ func (m *FlowResponse) GetAction() Action { if m != nil { return m.Action } - return Action_ADD + return Action_UPDATE +} + +func (m *FlowResponse) GetTrasctionID() string { + if m != nil { + return m.TrasctionID + } + return "" } type Flow struct { - WayId int64 `protobuf:"zigzag64,1,opt,name=wayId,proto3" json:"wayId,omitempty"` + WayID int64 `protobuf:"zigzag64,1,opt,name=wayID,proto3" json:"wayID,omitempty"` Speed float32 `protobuf:"fixed32,2,opt,name=speed,proto3" json:"speed,omitempty"` - TrafficLevel TrafficLevel `protobuf:"varint,3,opt,name=trafficLevel,proto3,enum=proxy.TrafficLevel" json:"trafficLevel,omitempty"` + TrafficLevel TrafficLevel `protobuf:"varint,3,opt,name=trafficLevel,proto3,enum=trafficproxy.TrafficLevel" json:"trafficLevel,omitempty"` + Timestamp int64 `protobuf:"varint,4,opt,name=timestamp,proto3" json:"timestamp,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -640,7 +665,7 @@ func (m *Flow) Reset() { *m = Flow{} } func (m *Flow) String() string { return proto.CompactTextString(m) } func (*Flow) ProtoMessage() {} func (*Flow) Descriptor() ([]byte, []int) { - return fileDescriptor_700b50b08ed8dbaf, []int{7} + return fileDescriptor_9c857d02e32d0eb6, []int{7} } func (m *Flow) XXX_Unmarshal(b []byte) error { @@ -661,9 +686,9 @@ func (m *Flow) XXX_DiscardUnknown() { var xxx_messageInfo_Flow proto.InternalMessageInfo -func (m *Flow) GetWayId() int64 { +func (m *Flow) GetWayID() int64 { if m != nil { - return m.WayId + return m.WayID } return 0 } @@ -682,9 +707,17 @@ func (m *Flow) GetTrafficLevel() TrafficLevel { return TrafficLevel_NO_LEVELS } +func (m *Flow) GetTimestamp() int64 { + if m != nil { + return m.Timestamp + } + return 0 +} + type IncidentResponse struct { Incident *Incident `protobuf:"bytes,1,opt,name=incident,proto3" json:"incident,omitempty"` - Action Action `protobuf:"varint,2,opt,name=action,proto3,enum=proxy.Action" json:"action,omitempty"` + Action Action `protobuf:"varint,2,opt,name=action,proto3,enum=trafficproxy.Action" json:"action,omitempty"` + TrasctionID string `protobuf:"bytes,3,opt,name=trasctionID,proto3" json:"trasctionID,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -694,7 +727,7 @@ func (m *IncidentResponse) Reset() { *m = IncidentResponse{} } func (m *IncidentResponse) String() string { return proto.CompactTextString(m) } func (*IncidentResponse) ProtoMessage() {} func (*IncidentResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_700b50b08ed8dbaf, []int{8} + return fileDescriptor_9c857d02e32d0eb6, []int{8} } func (m *IncidentResponse) XXX_Unmarshal(b []byte) error { @@ -726,14 +759,21 @@ func (m *IncidentResponse) GetAction() Action { if m != nil { return m.Action } - return Action_ADD + return Action_UPDATE +} + +func (m *IncidentResponse) GetTrasctionID() string { + if m != nil { + return m.TrasctionID + } + return "" } type Incident struct { - IncidentId string `protobuf:"bytes,1,opt,name=incidentId,proto3" json:"incidentId,omitempty"` - AffectedWayIds []int64 `protobuf:"zigzag64,2,rep,packed,name=affectedWayIds,proto3" json:"affectedWayIds,omitempty"` - IncidentType IncidentType `protobuf:"varint,3,opt,name=incidentType,proto3,enum=proxy.IncidentType" json:"incidentType,omitempty"` - IncidentSeverity IncidentSeverity `protobuf:"varint,4,opt,name=incidentSeverity,proto3,enum=proxy.IncidentSeverity" json:"incidentSeverity,omitempty"` + IncidentID string `protobuf:"bytes,1,opt,name=incidentID,proto3" json:"incidentID,omitempty"` + AffectedWayIDs []int64 `protobuf:"zigzag64,2,rep,packed,name=affectedWayIDs,proto3" json:"affectedWayIDs,omitempty"` + IncidentType IncidentType `protobuf:"varint,3,opt,name=incidentType,proto3,enum=trafficproxy.IncidentType" json:"incidentType,omitempty"` + IncidentSeverity IncidentSeverity `protobuf:"varint,4,opt,name=incidentSeverity,proto3,enum=trafficproxy.IncidentSeverity" json:"incidentSeverity,omitempty"` IncidentLocation *Location `protobuf:"bytes,5,opt,name=incidentLocation,proto3" json:"incidentLocation,omitempty"` Description string `protobuf:"bytes,6,opt,name=description,proto3" json:"description,omitempty"` FirstCrossStreet string `protobuf:"bytes,7,opt,name=firstCrossStreet,proto3" json:"firstCrossStreet,omitempty"` @@ -742,6 +782,7 @@ type Incident struct { EventCode int32 `protobuf:"varint,10,opt,name=eventCode,proto3" json:"eventCode,omitempty"` AlertCEventQuantifier int32 `protobuf:"varint,11,opt,name=alertCEventQuantifier,proto3" json:"alertCEventQuantifier,omitempty"` IsBlocking bool `protobuf:"varint,12,opt,name=isBlocking,proto3" json:"isBlocking,omitempty"` + Timestamp int64 `protobuf:"varint,13,opt,name=timestamp,proto3" json:"timestamp,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -751,7 +792,7 @@ func (m *Incident) Reset() { *m = Incident{} } func (m *Incident) String() string { return proto.CompactTextString(m) } func (*Incident) ProtoMessage() {} func (*Incident) Descriptor() ([]byte, []int) { - return fileDescriptor_700b50b08ed8dbaf, []int{9} + return fileDescriptor_9c857d02e32d0eb6, []int{9} } func (m *Incident) XXX_Unmarshal(b []byte) error { @@ -772,16 +813,16 @@ func (m *Incident) XXX_DiscardUnknown() { var xxx_messageInfo_Incident proto.InternalMessageInfo -func (m *Incident) GetIncidentId() string { +func (m *Incident) GetIncidentID() string { if m != nil { - return m.IncidentId + return m.IncidentID } return "" } -func (m *Incident) GetAffectedWayIds() []int64 { +func (m *Incident) GetAffectedWayIDs() []int64 { if m != nil { - return m.AffectedWayIds + return m.AffectedWayIDs } return nil } @@ -856,6 +897,13 @@ func (m *Incident) GetIsBlocking() bool { return false } +func (m *Incident) GetTimestamp() int64 { + if m != nil { + return m.Timestamp + } + return 0 +} + type Location struct { Lat float64 `protobuf:"fixed64,1,opt,name=lat,proto3" json:"lat,omitempty"` Lon float64 `protobuf:"fixed64,2,opt,name=lon,proto3" json:"lon,omitempty"` @@ -868,7 +916,7 @@ func (m *Location) Reset() { *m = Location{} } func (m *Location) String() string { return proto.CompactTextString(m) } func (*Location) ProtoMessage() {} func (*Location) Descriptor() ([]byte, []int) { - return fileDescriptor_700b50b08ed8dbaf, []int{10} + return fileDescriptor_9c857d02e32d0eb6, []int{10} } func (m *Location) XXX_Unmarshal(b []byte) error { @@ -904,98 +952,103 @@ func (m *Location) GetLon() float64 { } func init() { - proto.RegisterEnum("proxy.TrafficType", TrafficType_name, TrafficType_value) - proto.RegisterEnum("proxy.TrafficLevel", TrafficLevel_name, TrafficLevel_value) - proto.RegisterEnum("proxy.Action", Action_name, Action_value) - proto.RegisterEnum("proxy.IncidentType", IncidentType_name, IncidentType_value) - proto.RegisterEnum("proxy.IncidentSeverity", IncidentSeverity_name, IncidentSeverity_value) - proto.RegisterType((*TrafficRequest)(nil), "proxy.TrafficRequest") - proto.RegisterType((*TrafficAllRequest)(nil), "proxy.TrafficAllRequest") - proto.RegisterType((*TrafficWayIdsRequest)(nil), "proxy.TrafficWayIdsRequest") - proto.RegisterType((*TrafficStreamingDeltaRequest)(nil), "proxy.TrafficStreamingDeltaRequest") - proto.RegisterType((*TrafficStreamingDeltaRequest_StreamingRule)(nil), "proxy.TrafficStreamingDeltaRequest.StreamingRule") - proto.RegisterType((*TrafficResponse)(nil), "proxy.TrafficResponse") - proto.RegisterType((*TrafficSource)(nil), "proxy.TrafficSource") - proto.RegisterType((*FlowResponse)(nil), "proxy.FlowResponse") - proto.RegisterType((*Flow)(nil), "proxy.Flow") - proto.RegisterType((*IncidentResponse)(nil), "proxy.IncidentResponse") - proto.RegisterType((*Incident)(nil), "proxy.Incident") - proto.RegisterType((*Location)(nil), "proxy.Location") -} - -func init() { proto.RegisterFile("proxy.proto", fileDescriptor_700b50b08ed8dbaf) } - -var fileDescriptor_700b50b08ed8dbaf = []byte{ - // 1098 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x56, 0xdd, 0x6e, 0xdb, 0x46, - 0x13, 0x15, 0xf5, 0xaf, 0xa1, 0x24, 0xaf, 0x37, 0x4e, 0x3e, 0xc1, 0x9f, 0xd1, 0x1a, 0x2c, 0x52, - 0xb8, 0x6a, 0x61, 0xb4, 0x6e, 0x80, 0xa2, 0xed, 0x15, 0x4d, 0xae, 0x23, 0x36, 0x34, 0xa9, 0x2c, - 0x29, 0x0b, 0xe8, 0x8d, 0xc0, 0x4a, 0x2b, 0x83, 0x88, 0x4c, 0xaa, 0x24, 0xed, 0xc4, 0x7d, 0x8d, - 0x5e, 0x16, 0x7d, 0x93, 0xbe, 0x55, 0x5f, 0xa0, 0xd8, 0x25, 0x29, 0x91, 0x92, 0x11, 0xe4, 0x8e, - 0x73, 0xce, 0xcc, 0xec, 0x9c, 0x99, 0xd9, 0x95, 0x40, 0x5e, 0x47, 0xe1, 0x87, 0xc7, 0xf3, 0x75, - 0x14, 0x26, 0x21, 0x6e, 0x08, 0x43, 0xf9, 0xab, 0x06, 0x7d, 0x37, 0xf2, 0x96, 0x4b, 0x7f, 0x4e, - 0xd9, 0xef, 0xf7, 0x2c, 0x4e, 0xf0, 0x4f, 0xd0, 0x4b, 0x52, 0xc4, 0x09, 0xef, 0xa3, 0x39, 0x1b, - 0x48, 0xa7, 0xd2, 0x99, 0x7c, 0x71, 0x74, 0x9e, 0x86, 0xbb, 0x45, 0x8e, 0x96, 0x5d, 0xf1, 0x2b, - 0x90, 0x33, 0xc0, 0x7d, 0x5c, 0xb3, 0x41, 0xf5, 0xb4, 0x76, 0xd6, 0xbf, 0xc0, 0xe5, 0x48, 0xce, - 0xd0, 0xa2, 0x1b, 0x1e, 0xc1, 0x61, 0x66, 0xaa, 0xab, 0x55, 0x56, 0xc6, 0xa0, 0x26, 0x4e, 0x1d, - 0x94, 0x63, 0xb7, 0xfc, 0xa8, 0x42, 0xf7, 0x83, 0xf0, 0x5b, 0x38, 0xca, 0xc0, 0xa9, 0xf7, 0x68, - 0x2c, 0xe2, 0x3c, 0x59, 0x5d, 0x24, 0xfb, 0x7f, 0x39, 0x59, 0xc9, 0x65, 0x54, 0xa1, 0x4f, 0x86, - 0x62, 0x1f, 0x4e, 0x72, 0x8d, 0x49, 0xc4, 0xbc, 0x3b, 0x3f, 0xb8, 0xd5, 0xd9, 0x2a, 0xf1, 0xf2, - 0xd4, 0x0d, 0x91, 0xfa, 0x8b, 0x9d, 0xee, 0x3c, 0xe5, 0x3a, 0xaa, 0xd0, 0x8f, 0xa6, 0xba, 0x3c, - 0x80, 0x5e, 0x94, 0x7e, 0xce, 0xc2, 0x80, 0x85, 0x4b, 0xe5, 0x19, 0x1c, 0xee, 0x09, 0x57, 0xce, - 0xe1, 0xe8, 0x29, 0x01, 0xf8, 0x05, 0x34, 0xdf, 0x0b, 0x60, 0x20, 0x9d, 0xd6, 0xce, 0x30, 0xcd, - 0x2c, 0xe5, 0x1f, 0x09, 0x4e, 0x3e, 0x56, 0x16, 0x9e, 0x42, 0x2f, 0xce, 0x09, 0x7a, 0xbf, 0xca, - 0x07, 0xfe, 0xdd, 0x27, 0x48, 0x3a, 0x77, 0x8a, 0x81, 0xb4, 0x9c, 0xe7, 0x58, 0x83, 0x5e, 0x89, - 0xc7, 0x03, 0x68, 0xdd, 0x79, 0x1f, 0x1c, 0xff, 0x8f, 0xf4, 0x8c, 0x06, 0xcd, 0xcd, 0x8c, 0x71, - 0xfd, 0x3b, 0xbe, 0x34, 0x39, 0xc3, 0x4d, 0xe5, 0x4f, 0x09, 0x0e, 0x36, 0x1b, 0x1a, 0xaf, 0xc3, - 0x20, 0x66, 0xf8, 0x47, 0xe8, 0x2d, 0x57, 0xe1, 0xfb, 0xdc, 0x4e, 0x15, 0xcb, 0x17, 0xcf, 0xb2, - 0x8a, 0xaf, 0x0a, 0x1c, 0x2d, 0x7b, 0x62, 0x02, 0x87, 0x7e, 0x30, 0xf7, 0x17, 0x2c, 0x48, 0xb6, - 0xe1, 0x55, 0x11, 0xfe, 0xbf, 0x2c, 0xdc, 0xd8, 0xe1, 0xe9, 0x7e, 0x84, 0x12, 0x43, 0xaf, 0x74, - 0x11, 0x78, 0xf7, 0x23, 0x76, 0xeb, 0x87, 0x81, 0x50, 0xd6, 0xa1, 0x99, 0x85, 0xcf, 0xe0, 0x20, - 0x9b, 0xf9, 0x38, 0x0a, 0x1f, 0xfc, 0x05, 0x8b, 0x84, 0xc0, 0x0e, 0xdd, 0x85, 0xf1, 0x29, 0xc8, - 0x77, 0xde, 0x7a, 0xe3, 0x55, 0x13, 0x5e, 0x45, 0x48, 0xb9, 0x81, 0x6e, 0x51, 0x1a, 0xfe, 0x1c, - 0xea, 0x5c, 0x5c, 0x36, 0x2f, 0xb9, 0xa8, 0x5e, 0x10, 0xf8, 0x25, 0x34, 0xbd, 0x79, 0xc2, 0x8b, - 0xe2, 0x67, 0xf6, 0x2f, 0x7a, 0x99, 0x8b, 0x2a, 0x40, 0x9a, 0x91, 0xca, 0x3b, 0xa8, 0xf3, 0x20, - 0x7c, 0x04, 0x0d, 0xb1, 0x33, 0x22, 0x21, 0xa6, 0xa9, 0xc1, 0xd1, 0x78, 0xcd, 0xd8, 0x42, 0xe4, - 0xa8, 0xd2, 0xd4, 0xc0, 0x3f, 0x40, 0x37, 0x13, 0x60, 0xb2, 0x07, 0xb6, 0x12, 0xe5, 0xf6, 0x37, - 0x13, 0x70, 0x0b, 0x14, 0x2d, 0x39, 0x2a, 0x4b, 0x40, 0xbb, 0x0d, 0xc6, 0x5f, 0x43, 0x3b, 0x6f, - 0x71, 0x26, 0xe6, 0x60, 0x77, 0x16, 0x1b, 0x87, 0x4f, 0x15, 0xf5, 0x77, 0x1d, 0xda, 0x79, 0x34, - 0xfe, 0x0c, 0x20, 0x8f, 0xcf, 0xe4, 0x75, 0x68, 0x01, 0xc1, 0x5f, 0x42, 0xdf, 0x5b, 0x2e, 0xd9, - 0x3c, 0x61, 0x8b, 0xf4, 0x52, 0x89, 0x95, 0xc0, 0x74, 0x07, 0xe5, 0xaa, 0xf3, 0x28, 0xf1, 0xc0, - 0x95, 0x55, 0x1b, 0x05, 0x8a, 0x96, 0x1c, 0xb1, 0x06, 0x28, 0xb7, 0x1d, 0xf6, 0xc0, 0x22, 0x3f, - 0x79, 0x14, 0x8f, 0x52, 0x7f, 0x6f, 0xeb, 0x72, 0x9a, 0xee, 0x05, 0xe0, 0x9f, 0xb7, 0x49, 0xcc, - 0x70, 0xee, 0x89, 0x1e, 0x34, 0x4a, 0xed, 0xca, 0x61, 0xba, 0xe7, 0xc8, 0xd7, 0x6b, 0xc1, 0xe2, - 0x79, 0xe4, 0xaf, 0x45, 0x5c, 0x33, 0x5d, 0xaf, 0x02, 0x84, 0x87, 0x80, 0x96, 0x7e, 0x14, 0x27, - 0x5a, 0x14, 0xc6, 0x31, 0xbf, 0xb8, 0x2c, 0x19, 0xb4, 0x84, 0xdb, 0x1e, 0x8e, 0xbf, 0x81, 0xc3, - 0x98, 0xcd, 0xc3, 0x60, 0x51, 0x74, 0x6e, 0x0b, 0xe7, 0x7d, 0x82, 0xb7, 0x3f, 0x16, 0x5f, 0x96, - 0x77, 0xc7, 0x06, 0x9d, 0xb4, 0xfd, 0x5b, 0x04, 0x9f, 0x40, 0x87, 0x3d, 0xb0, 0x20, 0xd1, 0xc2, - 0x05, 0x1b, 0x80, 0xb8, 0xff, 0x5b, 0x00, 0xbf, 0x82, 0xe7, 0xde, 0x8a, 0x45, 0x89, 0x46, 0x38, - 0xf4, 0xf6, 0xde, 0x0b, 0x12, 0x7f, 0xe9, 0xb3, 0x68, 0x20, 0x0b, 0xcf, 0xa7, 0x49, 0x31, 0xf2, - 0xf8, 0x72, 0x15, 0xce, 0xdf, 0xf9, 0xc1, 0xed, 0xa0, 0x7b, 0x2a, 0x9d, 0xb5, 0x69, 0x01, 0x51, - 0xce, 0xa1, 0xbd, 0xe9, 0x0d, 0x82, 0xda, 0xca, 0x4b, 0x57, 0x4f, 0xa2, 0xfc, 0x53, 0x20, 0xd9, - 0x86, 0x71, 0x24, 0x0c, 0x86, 0x2f, 0x41, 0x2e, 0xfc, 0x80, 0xe1, 0x36, 0xd4, 0xaf, 0x4c, 0x7b, - 0x8a, 0x2a, 0xb8, 0x0b, 0x6d, 0xc3, 0xd2, 0x0c, 0x9d, 0x58, 0x2e, 0x92, 0x86, 0x0b, 0xe8, 0x16, - 0x97, 0x1f, 0xf7, 0xa0, 0x63, 0xd9, 0x33, 0x93, 0xdc, 0x10, 0xd3, 0x41, 0x15, 0x0c, 0xd0, 0xd4, - 0x4c, 0xdb, 0x21, 0x3a, 0x92, 0x38, 0xa5, 0xd9, 0xd6, 0x6b, 0xe2, 0xb8, 0x44, 0x47, 0x35, 0xdc, - 0x07, 0x70, 0x4c, 0x7b, 0x3a, 0x73, 0xc6, 0x84, 0xe8, 0xa8, 0xc1, 0xe9, 0x2b, 0x4a, 0xc8, 0x4c, - 0x1c, 0xd3, 0xc2, 0x32, 0xb4, 0x26, 0xd6, 0x1b, 0xcb, 0x9e, 0x5a, 0x08, 0x86, 0x5f, 0x41, 0x33, - 0x5d, 0x77, 0xdc, 0x82, 0x9a, 0xaa, 0xeb, 0x69, 0xe6, 0xc9, 0x58, 0x57, 0x5d, 0x82, 0x24, 0xfe, - 0xad, 0x13, 0x93, 0xb8, 0x04, 0x55, 0x87, 0xff, 0x4a, 0xd0, 0x2d, 0x2e, 0x26, 0xaf, 0x57, 0xd5, - 0xb2, 0x7a, 0x2b, 0xfc, 0xd4, 0xac, 0x08, 0xc3, 0xb6, 0x90, 0x84, 0x11, 0x74, 0x35, 0xdb, 0x72, - 0x5c, 0x3a, 0xd1, 0x04, 0x52, 0xc5, 0x47, 0x80, 0x74, 0xc3, 0x51, 0x2f, 0x4d, 0xa2, 0xcf, 0x6e, - 0xc8, 0xc8, 0xd0, 0x4c, 0x82, 0x6a, 0xb8, 0x03, 0x0d, 0x72, 0xc3, 0x53, 0xd4, 0xf1, 0x21, 0xf4, - 0xae, 0x0d, 0x47, 0x23, 0xa6, 0xa9, 0x5a, 0xc4, 0x9e, 0x38, 0xa8, 0xc1, 0xa1, 0xb1, 0xa9, 0x5a, - 0x16, 0xd1, 0x67, 0xa9, 0x57, 0x13, 0x1f, 0x80, 0x4c, 0x6d, 0x55, 0x9f, 0x8d, 0xd4, 0x5f, 0x55, - 0xaa, 0xa3, 0x16, 0x3e, 0x86, 0x17, 0x8e, 0x36, 0x22, 0xfa, 0x84, 0x27, 0x2e, 0x9d, 0xd9, 0xe6, - 0x62, 0xa7, 0x44, 0x75, 0x47, 0x84, 0xa2, 0x0e, 0x57, 0x33, 0xb6, 0x4d, 0x43, 0x23, 0x08, 0x78, - 0x79, 0x22, 0x0b, 0x6f, 0xe2, 0x84, 0x12, 0x24, 0xf3, 0xf2, 0x52, 0xc4, 0xb6, 0x5c, 0xaa, 0xa6, - 0x09, 0xba, 0x43, 0x67, 0xfb, 0xca, 0x6c, 0xae, 0x8f, 0x0c, 0xad, 0x4b, 0xd3, 0xd6, 0xde, 0x10, - 0x9a, 0x4e, 0x4d, 0xa3, 0x86, 0x6b, 0x68, 0xaa, 0x89, 0x24, 0xae, 0xe6, 0x5a, 0xfd, 0xc5, 0xa6, - 0xa8, 0x2a, 0x3e, 0x0d, 0xcb, 0xa6, 0xe9, 0x44, 0xf8, 0x40, 0x8c, 0xeb, 0xb1, 0xaa, 0xb9, 0xa8, - 0x7e, 0x31, 0xd9, 0xfc, 0x57, 0x72, 0x58, 0xf4, 0xe0, 0xcf, 0xf9, 0xb5, 0xee, 0xbf, 0x66, 0x49, - 0x06, 0xea, 0x5e, 0xe2, 0xe1, 0xe7, 0xe5, 0x17, 0x30, 0xfb, 0x9d, 0x3c, 0x7e, 0xb1, 0x0b, 0xa7, - 0x2f, 0x9f, 0x52, 0xf9, 0x56, 0xfa, 0xad, 0x29, 0xfe, 0x91, 0x7d, 0xff, 0x5f, 0x00, 0x00, 0x00, - 0xff, 0xff, 0x7e, 0xe1, 0xa3, 0x35, 0xa0, 0x09, 0x00, 0x00, + proto.RegisterEnum("trafficproxy.TrafficType", TrafficType_name, TrafficType_value) + proto.RegisterEnum("trafficproxy.TrafficLevel", TrafficLevel_name, TrafficLevel_value) + proto.RegisterEnum("trafficproxy.Action", Action_name, Action_value) + proto.RegisterEnum("trafficproxy.IncidentType", IncidentType_name, IncidentType_value) + proto.RegisterEnum("trafficproxy.IncidentSeverity", IncidentSeverity_name, IncidentSeverity_value) + proto.RegisterType((*TrafficRequest)(nil), "trafficproxy.TrafficRequest") + proto.RegisterType((*TrafficAllRequest)(nil), "trafficproxy.TrafficAllRequest") + proto.RegisterType((*TrafficWayIDsRequest)(nil), "trafficproxy.TrafficWayIDsRequest") + proto.RegisterType((*TrafficStreamingDeltaRequest)(nil), "trafficproxy.TrafficStreamingDeltaRequest") + proto.RegisterType((*TrafficStreamingDeltaRequest_StreamingRule)(nil), "trafficproxy.TrafficStreamingDeltaRequest.StreamingRule") + proto.RegisterType((*TrafficResponse)(nil), "trafficproxy.TrafficResponse") + proto.RegisterType((*TrafficSource)(nil), "trafficproxy.TrafficSource") + proto.RegisterType((*FlowResponse)(nil), "trafficproxy.FlowResponse") + proto.RegisterType((*Flow)(nil), "trafficproxy.Flow") + proto.RegisterType((*IncidentResponse)(nil), "trafficproxy.IncidentResponse") + proto.RegisterType((*Incident)(nil), "trafficproxy.Incident") + proto.RegisterType((*Location)(nil), "trafficproxy.Location") +} + +func init() { proto.RegisterFile("trafficproxy.proto", fileDescriptor_9c857d02e32d0eb6) } + +var fileDescriptor_9c857d02e32d0eb6 = []byte{ + // 1169 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x56, 0xdd, 0x8e, 0xdb, 0x44, + 0x1b, 0x8e, 0x37, 0xff, 0x6f, 0x7e, 0x76, 0x76, 0xbe, 0xfd, 0xaa, 0xb0, 0x2c, 0x25, 0x8a, 0x44, + 0x15, 0x45, 0xd5, 0x0a, 0x05, 0x0e, 0x90, 0x90, 0x10, 0x8e, 0x3d, 0x6d, 0xdc, 0xba, 0x76, 0x3a, + 0x76, 0xba, 0x88, 0x03, 0x22, 0x37, 0x99, 0x54, 0x16, 0x49, 0x1c, 0x6c, 0xef, 0xb6, 0xcb, 0x05, + 0x70, 0x0c, 0x27, 0x9c, 0x73, 0x2d, 0xdc, 0x00, 0x17, 0xc0, 0x5d, 0x70, 0xc2, 0x21, 0x9a, 0xb1, + 0x9d, 0xd8, 0x89, 0xb7, 0xe2, 0x80, 0xb3, 0xbc, 0xcf, 0xfb, 0xcc, 0xf3, 0xfe, 0xce, 0xc4, 0x80, + 0x43, 0xdf, 0x59, 0x2e, 0xdd, 0xf9, 0xd6, 0xf7, 0xde, 0xdd, 0x5d, 0x6d, 0x7d, 0x2f, 0xf4, 0x70, + 0x33, 0x8d, 0xf5, 0xfe, 0x2c, 0x42, 0xdb, 0x8e, 0x00, 0xca, 0x7e, 0xb8, 0x61, 0x41, 0x88, 0x65, + 0x68, 0xc5, 0x14, 0xcb, 0xbb, 0xf1, 0xe7, 0xac, 0x23, 0x75, 0xa5, 0x7e, 0x63, 0xf8, 0xe1, 0x55, + 0x46, 0xcc, 0x4e, 0x53, 0x68, 0xf6, 0x04, 0xfe, 0x12, 0x1a, 0x31, 0x60, 0xdf, 0x6d, 0x59, 0xe7, + 0xa4, 0x5b, 0xec, 0xb7, 0x87, 0x1f, 0xe4, 0x0a, 0x70, 0x02, 0x4d, 0xb3, 0xb1, 0x09, 0x67, 0xb1, + 0x29, 0xaf, 0x56, 0x71, 0x52, 0x9d, 0xa2, 0xc8, 0xe1, 0xe3, 0x5c, 0x89, 0x3d, 0x6d, 0x5c, 0xa0, + 0xc7, 0x67, 0xf1, 0x37, 0x70, 0x1e, 0x83, 0xd7, 0xce, 0x9d, 0xa6, 0x06, 0x89, 0x66, 0x49, 0x68, + 0xf6, 0x72, 0x35, 0x33, 0xcc, 0x71, 0x81, 0xe6, 0x2a, 0xe0, 0x2d, 0x5c, 0x26, 0x85, 0x87, 0x3e, + 0x73, 0xd6, 0xee, 0xe6, 0x8d, 0xca, 0x56, 0xa1, 0x93, 0x44, 0x28, 0x8b, 0x08, 0x83, 0xfc, 0xce, + 0xe5, 0x9d, 0x18, 0x17, 0xe8, 0x7b, 0x15, 0xf1, 0x25, 0xd4, 0x43, 0x77, 0xcd, 0x82, 0xd0, 0x59, + 0x6f, 0x3b, 0x95, 0xae, 0xd4, 0x2f, 0xd2, 0x3d, 0x30, 0x3a, 0x85, 0x96, 0x1f, 0x11, 0x67, 0xde, + 0x86, 0x79, 0xcb, 0xde, 0xff, 0xe0, 0xec, 0xa8, 0x49, 0xbd, 0x2b, 0x38, 0xcf, 0xab, 0x12, 0x3f, + 0x80, 0xca, 0x5b, 0x01, 0x74, 0xa4, 0x6e, 0xb1, 0x8f, 0x69, 0x6c, 0xf5, 0x7e, 0x97, 0xe0, 0xf2, + 0x7d, 0x49, 0xe3, 0xef, 0xa0, 0x15, 0x24, 0x0e, 0x7a, 0xb3, 0x4a, 0x36, 0xe6, 0x8b, 0x7f, 0x5f, + 0xf7, 0x95, 0x95, 0x3e, 0x4f, 0xb3, 0x72, 0x17, 0x0a, 0xb4, 0x32, 0x7e, 0xdc, 0x81, 0xea, 0xda, + 0x79, 0x67, 0xb9, 0x3f, 0x46, 0xa1, 0xca, 0x34, 0x31, 0x63, 0x8f, 0xed, 0xae, 0xf9, 0xd6, 0x25, + 0x1e, 0x6e, 0xf6, 0x7e, 0x93, 0xe0, 0x74, 0xb7, 0xe9, 0xc1, 0xd6, 0xdb, 0x04, 0x0c, 0x7f, 0x0d, + 0xad, 0xe5, 0xca, 0x7b, 0x9b, 0xd8, 0x51, 0xe1, 0x8d, 0xe1, 0x45, 0x36, 0xf1, 0x27, 0x29, 0x0a, + 0xcd, 0x1e, 0xc0, 0x3a, 0x9c, 0xb9, 0x9b, 0xb9, 0xbb, 0x60, 0x9b, 0x70, 0xaf, 0x72, 0x22, 0x54, + 0x1e, 0x66, 0x55, 0xb4, 0x03, 0x1a, 0x3d, 0x3e, 0xd8, 0xfb, 0x45, 0x82, 0x56, 0xe6, 0x62, 0xf1, + 0x99, 0xf8, 0xec, 0x8d, 0xeb, 0x6d, 0x44, 0xa1, 0x75, 0x1a, 0x5b, 0xb8, 0x0f, 0xa7, 0xb1, 0xfa, + 0xc4, 0xf7, 0x6e, 0xdd, 0x05, 0xf3, 0x45, 0xbd, 0x75, 0x7a, 0x08, 0xe3, 0x2e, 0x34, 0xd6, 0xce, + 0x76, 0xc7, 0x2a, 0x0a, 0x56, 0x1a, 0xe2, 0x3b, 0x15, 0xdc, 0xbc, 0x8e, 0xc3, 0x94, 0xba, 0xc5, + 0x7e, 0x9d, 0xee, 0x81, 0xde, 0x4f, 0x12, 0x34, 0xd3, 0x1d, 0xc0, 0x8f, 0xa0, 0xc4, 0x7b, 0x10, + 0x0f, 0x19, 0xe7, 0xf4, 0x4a, 0xf8, 0xf1, 0x63, 0xa8, 0x38, 0xf3, 0x90, 0x6b, 0xf2, 0xcc, 0xda, + 0xc3, 0xf3, 0x2c, 0x53, 0x16, 0x3e, 0x1a, 0x73, 0x78, 0x9a, 0xa1, 0xef, 0x04, 0xc2, 0xd0, 0xd4, + 0x24, 0xcd, 0x14, 0xd4, 0xfb, 0x59, 0x82, 0x12, 0x97, 0xc7, 0xe7, 0x50, 0x16, 0x9b, 0x29, 0x32, + 0xc0, 0x34, 0x32, 0x38, 0x1a, 0x6c, 0x19, 0x5b, 0x88, 0x68, 0x27, 0x34, 0x32, 0xf0, 0x57, 0x90, + 0xbc, 0x77, 0x3a, 0xbb, 0x65, 0x2b, 0xa1, 0xdb, 0x3e, 0x1c, 0xb0, 0x9d, 0x62, 0xd0, 0x0c, 0x3f, + 0x7b, 0xdf, 0x4a, 0x07, 0xf7, 0xad, 0xf7, 0xab, 0x04, 0xe8, 0x70, 0xae, 0x78, 0x08, 0xb5, 0x64, + 0xb2, 0x71, 0x8f, 0x1e, 0xdc, 0xb3, 0x09, 0x3b, 0xde, 0x7f, 0xde, 0xab, 0x3f, 0x4a, 0x50, 0x4b, + 0xc2, 0xe0, 0x87, 0x00, 0x49, 0xa0, 0xb8, 0x69, 0x75, 0x9a, 0x42, 0xf0, 0x23, 0x68, 0x3b, 0xcb, + 0x25, 0x9b, 0x87, 0x6c, 0x11, 0x3d, 0x08, 0x62, 0x81, 0x31, 0x3d, 0x40, 0x79, 0x2f, 0x93, 0x53, + 0xe2, 0x59, 0xcf, 0xed, 0xa5, 0x96, 0x62, 0xd0, 0x0c, 0x1f, 0x3f, 0x03, 0x94, 0xd8, 0x16, 0xbb, + 0x65, 0xbe, 0x1b, 0xde, 0x89, 0x96, 0xb6, 0xef, 0xbb, 0x2a, 0x09, 0x8b, 0x1e, 0x9d, 0xc3, 0xa3, + 0xbd, 0x96, 0xee, 0xcd, 0x1d, 0xd1, 0xba, 0x72, 0x5e, 0xb3, 0x13, 0x2f, 0x3d, 0xe2, 0xf3, 0x36, + 0x2e, 0x58, 0x30, 0xf7, 0xdd, 0xad, 0x38, 0x5e, 0x89, 0xda, 0x98, 0x82, 0xf0, 0x00, 0xd0, 0xd2, + 0xf5, 0x83, 0x50, 0xf1, 0xbd, 0x20, 0xe0, 0x4f, 0x10, 0x0b, 0x3b, 0x55, 0x41, 0x3b, 0xc2, 0xf1, + 0x63, 0x38, 0x0b, 0xd8, 0xdc, 0xdb, 0x2c, 0xd2, 0xe4, 0x9a, 0x20, 0x1f, 0x3b, 0xf8, 0x4c, 0x02, + 0xf1, 0xcb, 0x70, 0xd6, 0xac, 0x53, 0x8f, 0x66, 0xb2, 0x47, 0xf8, 0xde, 0xb1, 0x5b, 0xb6, 0x09, + 0x15, 0x6f, 0xc1, 0x3a, 0x20, 0x5e, 0xb2, 0x3d, 0x80, 0x3f, 0x87, 0xff, 0x3b, 0x2b, 0xe6, 0x87, + 0x0a, 0xe1, 0xd0, 0xcb, 0x1b, 0x67, 0x13, 0xba, 0x4b, 0x97, 0xf9, 0x9d, 0x86, 0x60, 0xe6, 0x3b, + 0xc5, 0x1e, 0x04, 0xa3, 0x95, 0x37, 0xff, 0xde, 0xdd, 0xbc, 0xe9, 0x34, 0xbb, 0x52, 0xbf, 0x46, + 0x53, 0x48, 0x76, 0xd7, 0x5b, 0x87, 0xbb, 0x7e, 0x05, 0xb5, 0x5d, 0xe7, 0x10, 0x14, 0x57, 0x4e, + 0xb4, 0xdd, 0x12, 0xe5, 0x3f, 0x05, 0x12, 0x6f, 0x2f, 0x47, 0xbc, 0xcd, 0xe0, 0x13, 0x68, 0xa4, + 0xfe, 0xe2, 0x71, 0x0d, 0x4a, 0x4f, 0x74, 0xf3, 0x1a, 0x15, 0x70, 0x13, 0x6a, 0x9a, 0xa1, 0x68, + 0x2a, 0x31, 0x6c, 0x24, 0x0d, 0x16, 0xd0, 0x4c, 0x5f, 0x3f, 0xdc, 0x82, 0xba, 0x61, 0xce, 0x74, + 0xf2, 0x8a, 0xe8, 0x16, 0x2a, 0x60, 0x80, 0x8a, 0xa2, 0x9b, 0x16, 0x51, 0x91, 0xc4, 0x5d, 0x8a, + 0x69, 0x3c, 0x25, 0x96, 0x4d, 0x54, 0x54, 0xc4, 0x6d, 0x00, 0x4b, 0x37, 0xaf, 0x67, 0xd6, 0x84, + 0x10, 0x15, 0x95, 0xb9, 0xfb, 0x09, 0x25, 0x64, 0x26, 0xc2, 0x54, 0x71, 0x03, 0xaa, 0x53, 0xe3, + 0xb9, 0x61, 0x5e, 0x1b, 0x08, 0x06, 0x5d, 0xa8, 0x44, 0x77, 0x88, 0x0b, 0x4e, 0x27, 0xaa, 0x6c, + 0x93, 0x48, 0x5c, 0x25, 0x3a, 0xb1, 0x09, 0x92, 0x06, 0x7f, 0x49, 0xd0, 0x4c, 0xef, 0x2e, 0x4f, + 0x53, 0x56, 0xe2, 0x34, 0x0b, 0x3c, 0x58, 0x1c, 0x5b, 0x33, 0x0d, 0x24, 0x61, 0x04, 0x4d, 0xc5, + 0x34, 0x2c, 0x9b, 0x4e, 0x15, 0x81, 0x9c, 0xe0, 0x73, 0x40, 0xaa, 0x66, 0xc9, 0x23, 0x9d, 0xa8, + 0xb3, 0x57, 0x64, 0xac, 0x29, 0x3a, 0x41, 0x45, 0x5c, 0x87, 0x32, 0x79, 0xc5, 0x25, 0x4a, 0xf8, + 0x0c, 0x5a, 0x2f, 0x34, 0x4b, 0x21, 0xba, 0x2e, 0x1b, 0xc4, 0x9c, 0x5a, 0xa8, 0xcc, 0xa1, 0x89, + 0x2e, 0x1b, 0x06, 0x51, 0x67, 0x11, 0xab, 0x82, 0x4f, 0xa1, 0x41, 0x4d, 0x59, 0x9d, 0x8d, 0xe5, + 0x6f, 0x65, 0xaa, 0xa2, 0x2a, 0xbe, 0x80, 0x07, 0x96, 0x32, 0x26, 0xea, 0x94, 0x0b, 0x67, 0x62, + 0xd6, 0x78, 0x8d, 0xd7, 0x44, 0xb6, 0xc7, 0x84, 0xa2, 0x3a, 0xaf, 0x66, 0x62, 0xea, 0x9a, 0x42, + 0x10, 0xf0, 0xf4, 0x84, 0x0a, 0xef, 0xdd, 0x94, 0x12, 0xd4, 0xe0, 0xe9, 0x45, 0x88, 0x69, 0xd8, + 0x54, 0x8e, 0x04, 0x9a, 0x03, 0x6b, 0xff, 0x7e, 0xed, 0xae, 0x56, 0x03, 0xaa, 0x23, 0xdd, 0x54, + 0x9e, 0x13, 0x1a, 0x0d, 0x4b, 0xa1, 0x9a, 0xad, 0x29, 0xb2, 0x8e, 0x24, 0x5e, 0xcd, 0x0b, 0xf9, + 0x99, 0x49, 0xd1, 0x89, 0xf8, 0xa9, 0x19, 0x26, 0x8d, 0x06, 0xc1, 0xe7, 0xa0, 0xbd, 0x98, 0xc8, + 0x8a, 0x8d, 0x4a, 0xc3, 0xf9, 0xee, 0x93, 0xd2, 0x62, 0xfe, 0xad, 0x3b, 0x67, 0xf8, 0x25, 0xb4, + 0x9f, 0xb2, 0x30, 0x06, 0x55, 0x27, 0x74, 0xf0, 0x65, 0xee, 0x0b, 0x1c, 0x7f, 0x0d, 0x5c, 0x7c, + 0x74, 0x8f, 0x37, 0x7a, 0x61, 0x7b, 0x85, 0x4f, 0xa5, 0x51, 0xf1, 0x6f, 0x49, 0x7a, 0x5d, 0x11, + 0x9f, 0xb4, 0x9f, 0xfd, 0x13, 0x00, 0x00, 0xff, 0xff, 0xfa, 0x02, 0x4d, 0xaa, 0xe8, 0x0a, 0x00, + 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -1022,7 +1075,7 @@ func NewTrafficServiceClient(cc *grpc.ClientConn) TrafficServiceClient { } func (c *trafficServiceClient) GetTrafficData(ctx context.Context, in *TrafficRequest, opts ...grpc.CallOption) (TrafficService_GetTrafficDataClient, error) { - stream, err := c.cc.NewStream(ctx, &_TrafficService_serviceDesc.Streams[0], "/proxy.TrafficService/GetTrafficData", opts...) + stream, err := c.cc.NewStream(ctx, &_TrafficService_serviceDesc.Streams[0], "/trafficproxy.TrafficService/GetTrafficData", opts...) if err != nil { return nil, err } @@ -1092,7 +1145,7 @@ func (x *trafficServiceGetTrafficDataServer) Send(m *TrafficResponse) error { } var _TrafficService_serviceDesc = grpc.ServiceDesc{ - ServiceName: "proxy.TrafficService", + ServiceName: "trafficproxy.TrafficService", HandlerType: (*TrafficServiceServer)(nil), Methods: []grpc.MethodDesc{}, Streams: []grpc.StreamDesc{ @@ -1102,5 +1155,5 @@ var _TrafficService_serviceDesc = grpc.ServiceDesc{ ServerStreams: true, }, }, - Metadata: "proxy.proto", + Metadata: "trafficproxy.proto", } diff --git a/integration/pkg/trafficproxyclient/feeder.go b/integration/pkg/trafficproxyclient/feeder.go index 2f90e23c5e7..b1a04ee074d 100644 --- a/integration/pkg/trafficproxyclient/feeder.go +++ b/integration/pkg/trafficproxyclient/feeder.go @@ -7,7 +7,7 @@ import ( "github.com/golang/glog" "github.com/Telenav/osrm-backend/integration/pkg/trafficeater" - proxy "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" + "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" ) // Feeder will continuesly feed traffic flows and incidents. @@ -30,7 +30,7 @@ func (f *Feeder) RegisterEaters(e ...trafficeater.Eater) { // It'll block until `Shutdown` called or some error occurred. func (f *Feeder) Run() error { - feeds := make(chan proxy.TrafficResponse) + feeds := make(chan trafficproxy.TrafficResponse) // feed eater waitFeedingDone := make(chan struct{}) @@ -81,7 +81,7 @@ func (f *Feeder) Shutdown() { //TODO: } -func (f *Feeder) feed(in <-chan proxy.TrafficResponse) { +func (f *Feeder) feed(in <-chan trafficproxy.TrafficResponse) { for { resp, ok := <-in if !ok { diff --git a/integration/pkg/trafficproxyclient/flags.go b/integration/pkg/trafficproxyclient/flags.go index 32975d87001..2cd72f98f93 100644 --- a/integration/pkg/trafficproxyclient/flags.go +++ b/integration/pkg/trafficproxyclient/flags.go @@ -8,6 +8,7 @@ import ( var flags struct { target string region string + subregion string trafficProvider string mapProvider string flow bool @@ -20,6 +21,7 @@ var flags struct { func init() { flag.StringVar(&flags.target, "c", "127.0.0.1:10086", "target traffic proxy endpoint, format 'ip[:port]'. Default port 10086 will be used if only ip input, i.e '127.0.0.1' is same with '127.0.0.1:10086'.") flag.StringVar(&flags.region, "region", "na", "region") + flag.StringVar(&flags.subregion, "subregion", "", "subregion. Leave empty if requires full region data, otherwise use ',' split if many, e.g. 'us,ca'") flag.StringVar(&flags.trafficProvider, "traffic", "", "traffic data provider") flag.StringVar(&flags.mapProvider, "map", "", "map data provider") flag.BoolVar(&flags.flow, "flow", true, "Enable traffic flow.") diff --git a/integration/pkg/trafficproxyclient/get.go b/integration/pkg/trafficproxyclient/get.go index 9520013e811..b41c277f125 100644 --- a/integration/pkg/trafficproxyclient/get.go +++ b/integration/pkg/trafficproxyclient/get.go @@ -6,16 +6,16 @@ import ( "io" "time" - proxy "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" + "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" "github.com/golang/glog" ) // GetFlowsIncidents return flows and incidents for wayIds or full region. -func GetFlowsIncidents(wayIds []int64) (*proxy.TrafficResponse, error) { - var outTrafficResponse proxy.TrafficResponse +func GetFlowsIncidents(wayIDs []int64) (*trafficproxy.TrafficResponse, error) { + var outTrafficResponse trafficproxy.TrafficResponse forStr := "all" - if len(wayIds) > 0 { - forStr = fmt.Sprintf("%d wayIds", len(wayIds)) + if len(wayIDs) > 0 { + forStr = fmt.Sprintf("%d wayIds", len(wayIDs)) } startTime := time.Now() @@ -36,22 +36,22 @@ func GetFlowsIncidents(wayIds []int64) (*proxy.TrafficResponse, error) { ctx, cancel := context.WithTimeout(context.Background(), params{}.rpcGetTimeout()) defer cancel() - // new proxy client - client := proxy.NewTrafficServiceClient(conn) + // new trafficproxy client + client := trafficproxy.NewTrafficServiceClient(conn) // get flows glog.Infof("getting flows,incidents for %s\n", forStr) - var req proxy.TrafficRequest + var req trafficproxy.TrafficRequest req.TrafficSource = params{}.newTrafficSource() req.TrafficType = params{}.newTrafficType() - if len(wayIds) > 0 { - var trafficWayIdsRequest proxy.TrafficRequest_TrafficWayIdsRequest - trafficWayIdsRequest.TrafficWayIdsRequest = new(proxy.TrafficWayIdsRequest) - trafficWayIdsRequest.TrafficWayIdsRequest.WayIds = wayIds - req.RequestOneof = &trafficWayIdsRequest + if len(wayIDs) > 0 { + var trafficWayIDsRequest trafficproxy.TrafficRequest_TrafficWayIDsRequest + trafficWayIDsRequest.TrafficWayIDsRequest = new(trafficproxy.TrafficWayIDsRequest) + trafficWayIDsRequest.TrafficWayIDsRequest.WayIDs = wayIDs + req.RequestOneof = &trafficWayIDsRequest } else { - trafficAllRequest := new(proxy.TrafficRequest_TrafficAllRequest) - trafficAllRequest.TrafficAllRequest = new(proxy.TrafficAllRequest) + trafficAllRequest := new(trafficproxy.TrafficRequest_TrafficAllRequest) + trafficAllRequest.TrafficAllRequest = new(trafficproxy.TrafficAllRequest) req.RequestOneof = trafficAllRequest } diff --git a/integration/pkg/trafficproxyclient/params.go b/integration/pkg/trafficproxyclient/params.go index 05ccff7dfce..1b187e1ff0f 100644 --- a/integration/pkg/trafficproxyclient/params.go +++ b/integration/pkg/trafficproxyclient/params.go @@ -1,35 +1,42 @@ package trafficproxyclient import ( + "strings" "time" - proxy "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" + "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" ) // params is used to group request parameters together. type params struct{} -func (p params) newTrafficSource() *proxy.TrafficSource { - t := proxy.TrafficSource{} +func (p params) newTrafficSource() *trafficproxy.TrafficSource { + t := trafficproxy.TrafficSource{} t.Region = flags.region t.TrafficProvider = flags.trafficProvider t.MapProvider = flags.mapProvider + + t.Subregion = []string{} + if len(flags.subregion) > 0 { + subregionStr := strings.TrimSuffix(flags.subregion, ",") // trim last ',' if exist + t.Subregion = strings.Split(subregionStr, ",") + } return &t } -func (p params) newTrafficType() []proxy.TrafficType { - t := []proxy.TrafficType{} +func (p params) newTrafficType() []trafficproxy.TrafficType { + t := []trafficproxy.TrafficType{} if flags.flow { - t = append(t, proxy.TrafficType_FLOW) + t = append(t, trafficproxy.TrafficType_FLOW) } if flags.incident { - t = append(t, proxy.TrafficType_INCIDENT) + t = append(t, trafficproxy.TrafficType_INCIDENT) } return t } -func (p params) newStreamingRule() *proxy.TrafficStreamingDeltaRequest_StreamingRule { - var r proxy.TrafficStreamingDeltaRequest_StreamingRule +func (p params) newStreamingRule() *trafficproxy.TrafficStreamingDeltaRequest_StreamingRule { + var r trafficproxy.TrafficStreamingDeltaRequest_StreamingRule r.MaxSize = int32(flags.streamingDeltaMaxSize) r.MaxTime = int32(flags.streamingDeltaMaxTime.Seconds()) return &r diff --git a/integration/pkg/trafficproxyclient/streaming_delta.go b/integration/pkg/trafficproxyclient/streaming_delta.go index caac3e33cb8..6f1f527feec 100644 --- a/integration/pkg/trafficproxyclient/streaming_delta.go +++ b/integration/pkg/trafficproxyclient/streaming_delta.go @@ -5,12 +5,12 @@ import ( "fmt" "io" - proxy "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" + "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" "github.com/golang/glog" ) // StreamingDeltaFlowsIncidents set up a new channel for traffic flows and incidents streaming delta. -func StreamingDeltaFlowsIncidents(out chan<- proxy.TrafficResponse) error { +func StreamingDeltaFlowsIncidents(out chan<- trafficproxy.TrafficResponse) error { // make RPC client conn, err := newGRPCConnection() @@ -23,15 +23,15 @@ func StreamingDeltaFlowsIncidents(out chan<- proxy.TrafficResponse) error { ctx := context.Background() // new proxy client - client := proxy.NewTrafficServiceClient(conn) + client := trafficproxy.NewTrafficServiceClient(conn) // get flows via stream glog.Info("streaming delta traffic flows,incidents") - var req proxy.TrafficRequest + var req trafficproxy.TrafficRequest req.TrafficSource = params{}.newTrafficSource() req.TrafficType = params{}.newTrafficType() - trafficDeltaStreamRequest := new(proxy.TrafficRequest_TrafficStreamingDeltaRequest) - trafficDeltaStreamRequest.TrafficStreamingDeltaRequest = new(proxy.TrafficStreamingDeltaRequest) + trafficDeltaStreamRequest := new(trafficproxy.TrafficRequest_TrafficStreamingDeltaRequest) + trafficDeltaStreamRequest.TrafficStreamingDeltaRequest = new(trafficproxy.TrafficStreamingDeltaRequest) trafficDeltaStreamRequest.TrafficStreamingDeltaRequest.StreamingRule = params{}.newStreamingRule() req.RequestOneof = trafficDeltaStreamRequest diff --git a/integration/trafficcache/flowscache/flows.go b/integration/trafficcache/flowscache/flows.go index 71fad7328b6..d48f5b45abc 100644 --- a/integration/trafficcache/flowscache/flows.go +++ b/integration/trafficcache/flowscache/flows.go @@ -3,19 +3,19 @@ package flowscache import ( "sync" - proxy "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" + "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" "github.com/golang/glog" ) // Cache stores flows in memory. type Cache struct { m sync.RWMutex - flows map[int64]*proxy.Flow + flows map[int64]*trafficproxy.Flow } // New creates a new cache to store flows in memory. func New() *Cache { - return &Cache{sync.RWMutex{}, map[int64]*proxy.Flow{}} + return &Cache{sync.RWMutex{}, map[int64]*trafficproxy.Flow{}} } //Clear clear the cache. @@ -23,11 +23,11 @@ func (c *Cache) Clear() { c.m.Lock() defer c.m.Unlock() - c.flows = map[int64]*proxy.Flow{} + c.flows = map[int64]*trafficproxy.Flow{} } // Query returns Live Traffic Flow if exist. -func (c *Cache) Query(wayID int64) *proxy.Flow { +func (c *Cache) Query(wayID int64) *trafficproxy.Flow { c.m.RLock() defer c.m.RUnlock() @@ -46,16 +46,22 @@ func (c *Cache) Count() int64 { } // Update updates flows in cache. -func (c *Cache) Update(flowResp []*proxy.FlowResponse) { +func (c *Cache) Update(flowResp []*trafficproxy.FlowResponse) { c.m.Lock() defer c.m.Unlock() for _, f := range flowResp { - if f.Action == proxy.Action_UPDATE || f.Action == proxy.Action_ADD { //TODO: Action_ADD will be removed soon - c.flows[f.Flow.WayId] = f.Flow + if f.Action == trafficproxy.Action_UPDATE { + if inCacheFlow, ok := c.flows[f.Flow.WayID]; ok { + if inCacheFlow.Timestamp <= f.Flow.Timestamp { + c.flows[f.Flow.WayID] = f.Flow // use newer if exist + } + continue + } + c.flows[f.Flow.WayID] = f.Flow // store if not exist continue - } else if f.Action == proxy.Action_DELETE { - delete(c.flows, f.Flow.WayId) + } else if f.Action == trafficproxy.Action_DELETE { + delete(c.flows, f.Flow.WayID) continue } diff --git a/integration/trafficcache/flowscache/flows_test.go b/integration/trafficcache/flowscache/flows_test.go index cfbbeeba593..cf6b5825ee8 100644 --- a/integration/trafficcache/flowscache/flows_test.go +++ b/integration/trafficcache/flowscache/flows_test.go @@ -4,30 +4,35 @@ import ( "reflect" "testing" - proxy "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" + "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" ) func TestFlowsCache(t *testing.T) { - presetFlows := []*proxy.Flow{ - &proxy.Flow{WayId: -1112859596, Speed: 6.110000, TrafficLevel: proxy.TrafficLevel_SLOW_SPEED}, - &proxy.Flow{WayId: 119961953, Speed: 10.550000, TrafficLevel: proxy.TrafficLevel_SLOW_SPEED}, - &proxy.Flow{WayId: -112614307, Speed: 16.110001, TrafficLevel: proxy.TrafficLevel_FREE_FLOW}, + presetFlows := []*trafficproxy.Flow{ + &trafficproxy.Flow{WayID: -1112859596, Speed: 6.110000, TrafficLevel: trafficproxy.TrafficLevel_SLOW_SPEED, Timestamp: 1579419488000}, + &trafficproxy.Flow{WayID: 119961953, Speed: 10.550000, TrafficLevel: trafficproxy.TrafficLevel_SLOW_SPEED, Timestamp: 1579419488000}, + &trafficproxy.Flow{WayID: -112614307, Speed: 16.110001, TrafficLevel: trafficproxy.TrafficLevel_FREE_FLOW, Timestamp: 1579419488000}, + } + + updateFlows := []*trafficproxy.Flow{ + &trafficproxy.Flow{WayID: -112614307, Speed: 20.110001, TrafficLevel: trafficproxy.TrafficLevel_FREE_FLOW, Timestamp: 1579419500000}, // newer + &trafficproxy.Flow{WayID: -112614307, Speed: 13.110001, TrafficLevel: trafficproxy.TrafficLevel_FREE_FLOW, Timestamp: 1579419000000}, // older } cache := New() - // update - cache.Update(newFlowResponses(presetFlows, proxy.Action_UPDATE)) + // update preset + cache.Update(newFlowResponses(presetFlows, trafficproxy.Action_UPDATE)) if cache.Count() != int64(len(presetFlows)) { t.Errorf("expect cached flows count %d but got %d", len(presetFlows), cache.Count()) } // query expect sucess for _, f := range presetFlows { - r := cache.Query(f.WayId) + r := cache.Query(f.WayID) if !reflect.DeepEqual(r, f) { - t.Errorf("Query Flow for wayID %d, expect %v but got %v", f.WayId, f, r) + t.Errorf("Query Flow for wayID %d, expect %v but got %v", f.WayID, f, r) } } @@ -40,10 +45,23 @@ func TestFlowsCache(t *testing.T) { } } + // update exists + cache.Update(newFlowResponses(updateFlows, trafficproxy.Action_UPDATE)) + if cache.Count() != int64(len(presetFlows)) { // expect no change + t.Errorf("expect cached flows count %d but got %d", len(presetFlows), cache.Count()) + } + + // query expect sucess + queryExpectFlow := updateFlows[0] + r := cache.Query(queryExpectFlow.WayID) + if !reflect.DeepEqual(r, queryExpectFlow) { + t.Errorf("Query Flow for wayID %d, expect %v but got %v", queryExpectFlow.WayID, queryExpectFlow, r) + } + // delete deleteCount := 2 deleteFlows := presetFlows[:deleteCount] - cache.Update(newFlowResponses(deleteFlows, proxy.Action_DELETE)) + cache.Update(newFlowResponses(deleteFlows, trafficproxy.Action_DELETE)) if cache.Count() != int64(len(presetFlows)-deleteCount) { t.Errorf("expect after delete, cached flows count %d but got %d", len(presetFlows)-deleteCount, cache.Count()) } @@ -56,11 +74,11 @@ func TestFlowsCache(t *testing.T) { } -func newFlowResponses(flows []*proxy.Flow, action proxy.Action) []*proxy.FlowResponse { +func newFlowResponses(flows []*trafficproxy.Flow, action trafficproxy.Action) []*trafficproxy.FlowResponse { - flowResponses := []*proxy.FlowResponse{} + flowResponses := []*trafficproxy.FlowResponse{} for _, f := range flows { - flowResponses = append(flowResponses, &proxy.FlowResponse{Flow: f, Action: action, XXX_NoUnkeyedLiteral: struct{}{}, XXX_unrecognized: nil, XXX_sizecache: 0}) + flowResponses = append(flowResponses, &trafficproxy.FlowResponse{Flow: f, Action: action, XXX_NoUnkeyedLiteral: struct{}{}, XXX_unrecognized: nil, XXX_sizecache: 0}) } return flowResponses } diff --git a/integration/trafficcache/flowscacheindexedbyedge/flows.go b/integration/trafficcache/flowscacheindexedbyedge/flows.go index d4e51df28df..6343c02dd7b 100644 --- a/integration/trafficcache/flowscacheindexedbyedge/flows.go +++ b/integration/trafficcache/flowscacheindexedbyedge/flows.go @@ -6,14 +6,14 @@ import ( "github.com/Telenav/osrm-backend/integration/wayidsmap" "github.com/Telenav/osrm-backend/integration/graph" - proxy "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" + "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" "github.com/golang/glog" ) // Cache stores flows in memory. type Cache struct { m sync.RWMutex - flows map[graph.Edge]*proxy.Flow + flows map[graph.Edge]*trafficproxy.Flow affectedWayIDs map[int64]struct{} wayID2Edges wayidsmap.Way2Edges } @@ -26,7 +26,7 @@ func New(wayID2Edges wayidsmap.Way2Edges) *Cache { } return &Cache{sync.RWMutex{}, - map[graph.Edge]*proxy.Flow{}, + map[graph.Edge]*trafficproxy.Flow{}, map[int64]struct{}{}, wayID2Edges} } @@ -36,12 +36,12 @@ func (c *Cache) Clear() { c.m.Lock() defer c.m.Unlock() - c.flows = map[graph.Edge]*proxy.Flow{} + c.flows = map[graph.Edge]*trafficproxy.Flow{} c.affectedWayIDs = map[int64]struct{}{} } // QueryByEdge returns Live Traffic Flow for Edge if exist. -func (c *Cache) QueryByEdge(edge graph.Edge) *proxy.Flow { +func (c *Cache) QueryByEdge(edge graph.Edge) *trafficproxy.Flow { c.m.RLock() defer c.m.RUnlock() @@ -53,11 +53,11 @@ func (c *Cache) QueryByEdge(edge graph.Edge) *proxy.Flow { } // QueryByEdges returns Live Traffic Flows for Edges if exist. -func (c *Cache) QueryByEdges(edges []graph.Edge) []*proxy.Flow { +func (c *Cache) QueryByEdges(edges []graph.Edge) []*trafficproxy.Flow { c.m.RLock() defer c.m.RUnlock() - out := make([]*proxy.Flow, len(edges), len(edges)) + out := make([]*trafficproxy.Flow, len(edges), len(edges)) for i := range edges { v, ok := c.flows[edges[i]] if ok { @@ -84,24 +84,30 @@ func (c *Cache) AffectedWaysCount() int64 { } // Update updates flows in cache. -func (c *Cache) Update(flowResp []*proxy.FlowResponse) { +func (c *Cache) Update(flowResp []*trafficproxy.FlowResponse) { c.m.Lock() defer c.m.Unlock() for _, f := range flowResp { - if f.Action == proxy.Action_UPDATE || f.Action == proxy.Action_ADD { //TODO: Action_ADD will be removed soon - edges := c.wayID2Edges.WayID2Edges(f.Flow.WayId) + if f.Action == trafficproxy.Action_UPDATE { + edges := c.wayID2Edges.WayID2Edges(f.Flow.WayID) for _, e := range edges { - c.flows[e] = f.Flow + if inCacheFlow, ok := c.flows[e]; ok { + if inCacheFlow.Timestamp <= f.Flow.Timestamp { + c.flows[e] = f.Flow // use newer if exist + } + continue + } + c.flows[e] = f.Flow // store if not exist } - c.affectedWayIDs[f.Flow.WayId] = struct{}{} + c.affectedWayIDs[f.Flow.WayID] = struct{}{} continue - } else if f.Action == proxy.Action_DELETE { - edges := c.wayID2Edges.WayID2Edges(f.Flow.WayId) + } else if f.Action == trafficproxy.Action_DELETE { + edges := c.wayID2Edges.WayID2Edges(f.Flow.WayID) for _, e := range edges { delete(c.flows, e) } - delete(c.affectedWayIDs, f.Flow.WayId) + delete(c.affectedWayIDs, f.Flow.WayID) continue } diff --git a/integration/trafficcache/flowscacheindexedbyedge/flows_test.go b/integration/trafficcache/flowscacheindexedbyedge/flows_test.go index 3a1e5da8fd1..5c20dc720d3 100644 --- a/integration/trafficcache/flowscacheindexedbyedge/flows_test.go +++ b/integration/trafficcache/flowscacheindexedbyedge/flows_test.go @@ -5,15 +5,20 @@ import ( "testing" "github.com/Telenav/osrm-backend/integration/graph" - proxy "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" + "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" ) func TestFlowsCache(t *testing.T) { - presetFlows := []*proxy.Flow{ - &proxy.Flow{WayId: -1112859596, Speed: 6.110000, TrafficLevel: proxy.TrafficLevel_SLOW_SPEED}, - &proxy.Flow{WayId: 119961953, Speed: 10.550000, TrafficLevel: proxy.TrafficLevel_SLOW_SPEED}, - &proxy.Flow{WayId: -112614307, Speed: 16.110001, TrafficLevel: proxy.TrafficLevel_FREE_FLOW}, + presetFlows := []*trafficproxy.Flow{ + &trafficproxy.Flow{WayID: -1112859596, Speed: 6.110000, TrafficLevel: trafficproxy.TrafficLevel_SLOW_SPEED, Timestamp: 1579419488000}, + &trafficproxy.Flow{WayID: 119961953, Speed: 10.550000, TrafficLevel: trafficproxy.TrafficLevel_SLOW_SPEED, Timestamp: 1579419488000}, + &trafficproxy.Flow{WayID: -112614307, Speed: 16.110001, TrafficLevel: trafficproxy.TrafficLevel_FREE_FLOW, Timestamp: 1579419488000}, + } + + updateFlows := []*trafficproxy.Flow{ + &trafficproxy.Flow{WayID: -112614307, Speed: 20.110001, TrafficLevel: trafficproxy.TrafficLevel_FREE_FLOW, Timestamp: 1579419500000}, // newer + &trafficproxy.Flow{WayID: -112614307, Speed: 13.110001, TrafficLevel: trafficproxy.TrafficLevel_FREE_FLOW, Timestamp: 1579419000000}, // older } wayid2NodeIDsMapping := wayID2NodeIDs{ @@ -25,7 +30,7 @@ func TestFlowsCache(t *testing.T) { cache := New(wayid2NodeIDsMapping) // update - cache.Update(newFlowResponses(presetFlows, proxy.Action_UPDATE)) + cache.Update(newFlowResponses(presetFlows, trafficproxy.Action_UPDATE)) expectedFlowsIndexedByEdgeCount := int64(12) if cache.Count() != expectedFlowsIndexedByEdgeCount { t.Errorf("expect flows count %d but got %d", expectedFlowsIndexedByEdgeCount, cache.Count()) @@ -65,10 +70,27 @@ func TestFlowsCache(t *testing.T) { } } + // update exists + cache.Update(newFlowResponses(updateFlows, trafficproxy.Action_UPDATE)) + if cache.Count() != expectedFlowsIndexedByEdgeCount { + t.Errorf("expect flows count %d but got %d", expectedFlowsIndexedByEdgeCount, cache.Count()) + } + if cache.AffectedWaysCount() != int64(len(presetFlows)) { // expect no change + t.Errorf("expect flows affected ways count %d but got %d", len(presetFlows), cache.AffectedWaysCount()) + } + + // query expect sucess + queryExpectFlow := updateFlows[0] + queryEdge := graph.Edge{From: 123456789002, To: 123456789021} + r := cache.QueryByEdge(queryEdge) + if !reflect.DeepEqual(r, queryExpectFlow) { + t.Errorf("Query Flow for Edge %v, expect %v but got %v", queryEdge, queryExpectFlow, r) + } + // delete deleteCount := 2 deleteFlows := presetFlows[:deleteCount] - cache.Update(newFlowResponses(deleteFlows, proxy.Action_DELETE)) + cache.Update(newFlowResponses(deleteFlows, trafficproxy.Action_DELETE)) expectedFlowsIndexedByEdgeCount = int64(5) if cache.Count() != expectedFlowsIndexedByEdgeCount { t.Errorf("expect flows count %d but got %d", expectedFlowsIndexedByEdgeCount, cache.Count()) @@ -85,11 +107,11 @@ func TestFlowsCache(t *testing.T) { } -func newFlowResponses(flows []*proxy.Flow, action proxy.Action) []*proxy.FlowResponse { +func newFlowResponses(flows []*trafficproxy.Flow, action trafficproxy.Action) []*trafficproxy.FlowResponse { - flowResponses := []*proxy.FlowResponse{} + flowResponses := []*trafficproxy.FlowResponse{} for _, f := range flows { - flowResponses = append(flowResponses, &proxy.FlowResponse{Flow: f, Action: action, XXX_NoUnkeyedLiteral: struct{}{}, XXX_unrecognized: nil, XXX_sizecache: 0}) + flowResponses = append(flowResponses, &trafficproxy.FlowResponse{Flow: f, Action: action, XXX_NoUnkeyedLiteral: struct{}{}, XXX_unrecognized: nil, XXX_sizecache: 0}) } return flowResponses } diff --git a/integration/trafficcache/incidentscache/incidents_test.go b/integration/trafficcache/incidentscache/incidents_test.go index 20bc96078b2..82242d09959 100644 --- a/integration/trafficcache/incidentscache/incidents_test.go +++ b/integration/trafficcache/incidentscache/incidents_test.go @@ -4,17 +4,17 @@ import ( "testing" "github.com/Telenav/osrm-backend/integration/graph" - proxy "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" + "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" ) func TestIncidentsCache(t *testing.T) { - presetIncidents := []*proxy.Incident{ - &proxy.Incident{ - IncidentId: "TTI-f47b8dba-59a3-372d-9cec-549eb252e2d5-TTR46312939215361-1", - AffectedWayIds: []int64{100663296, -1204020275, 100663296, -1204020274, 100663296, -916744017, 100663296, -1204020245, 100663296, -1194204646, 100663296, -1204394608, 100663296, -1194204647, 100663296, -129639168, 100663296, -1194204645}, - IncidentType: proxy.IncidentType_MISCELLANEOUS, - IncidentSeverity: proxy.IncidentSeverity_CRITICAL, - IncidentLocation: &proxy.Location{Lat: 44.181220, Lon: -117.135840}, + presetIncidents := []*trafficproxy.Incident{ + &trafficproxy.Incident{ + IncidentID: "TTI-f47b8dba-59a3-372d-9cec-549eb252e2d5-TTR46312939215361-1", + AffectedWayIDs: []int64{100663296, -1204020275, 100663296, -1204020274, 100663296, -916744017, 100663296, -1204020245, 100663296, -1194204646, 100663296, -1204394608, 100663296, -1194204647, 100663296, -129639168, 100663296, -1194204645}, + IncidentType: trafficproxy.IncidentType_MISCELLANEOUS, + IncidentSeverity: trafficproxy.IncidentSeverity_CRITICAL, + IncidentLocation: &trafficproxy.Location{Lat: 44.181220, Lon: -117.135840}, Description: "Construction on I-84 EB near MP 359, Drive with caution.", FirstCrossStreet: "", SecondCrossStreet: "", @@ -22,13 +22,14 @@ func TestIncidentsCache(t *testing.T) { EventCode: 500, AlertCEventQuantifier: 0, IsBlocking: false, + Timestamp: 1579419488000, }, - &proxy.Incident{ - IncidentId: "TTI-6f55a1ca-9a6e-38ef-ac40-0dbd3f5586df-TTR83431311705665-1", - AffectedWayIds: []int64{100663296, 19446119}, - IncidentType: proxy.IncidentType_ACCIDENT, - IncidentSeverity: proxy.IncidentSeverity_CRITICAL, - IncidentLocation: &proxy.Location{Lat: 37.592370, Lon: -77.56735040}, + &trafficproxy.Incident{ + IncidentID: "TTI-6f55a1ca-9a6e-38ef-ac40-0dbd3f5586df-TTR83431311705665-1", + AffectedWayIDs: []int64{100663296, 19446119}, + IncidentType: trafficproxy.IncidentType_ACCIDENT, + IncidentSeverity: trafficproxy.IncidentSeverity_CRITICAL, + IncidentLocation: &trafficproxy.Location{Lat: 37.592370, Lon: -77.56735040}, Description: "Incident on N PARHAM RD near RIDGE RD, Drive with caution.", FirstCrossStreet: "", SecondCrossStreet: "", @@ -36,13 +37,14 @@ func TestIncidentsCache(t *testing.T) { EventCode: 214, AlertCEventQuantifier: 0, IsBlocking: true, + Timestamp: 1579419488000, }, - &proxy.Incident{ - IncidentId: "mock-1", - AffectedWayIds: []int64{100663296, -1204020275, 100643296}, - IncidentType: proxy.IncidentType_ACCIDENT, - IncidentSeverity: proxy.IncidentSeverity_CRITICAL, - IncidentLocation: &proxy.Location{Lat: 37.592370, Lon: -77.56735040}, + &trafficproxy.Incident{ + IncidentID: "mock-1", + AffectedWayIDs: []int64{100663296, -1204020275, 100643296}, + IncidentType: trafficproxy.IncidentType_ACCIDENT, + IncidentSeverity: trafficproxy.IncidentSeverity_CRITICAL, + IncidentLocation: &trafficproxy.Location{Lat: 37.592370, Lon: -77.56735040}, Description: "Incident on N PARHAM RD near RIDGE RD, Drive with caution.", FirstCrossStreet: "", SecondCrossStreet: "", @@ -50,23 +52,58 @@ func TestIncidentsCache(t *testing.T) { EventCode: 214, AlertCEventQuantifier: 0, IsBlocking: true, + Timestamp: 1579419488000, }, } + updateIncidents := []*trafficproxy.Incident{ + &trafficproxy.Incident{ + IncidentID: "mock-1", + AffectedWayIDs: []int64{100663296, -1204020275, 100643296, 111111111}, + IncidentType: trafficproxy.IncidentType_ACCIDENT, + IncidentSeverity: trafficproxy.IncidentSeverity_CRITICAL, + IncidentLocation: &trafficproxy.Location{Lat: 37.592370, Lon: -77.56735040}, + Description: "Incident on N PARHAM RD near RIDGE RD, Drive with caution.", + FirstCrossStreet: "", + SecondCrossStreet: "", + StreetName: "N Parham Rd", + EventCode: 214, + AlertCEventQuantifier: 0, + IsBlocking: true, + Timestamp: 1579419500000, + }, // newer + &trafficproxy.Incident{ + IncidentID: "mock-1", + AffectedWayIDs: []int64{100663296, -1204020275, 100643296}, + IncidentType: trafficproxy.IncidentType_ACCIDENT, + IncidentSeverity: trafficproxy.IncidentSeverity_CRITICAL, + IncidentLocation: &trafficproxy.Location{Lat: 37.592370, Lon: -77.56735040}, + Description: "Incident on N PARHAM RD near RIDGE RD, Drive with caution.", + FirstCrossStreet: "", + SecondCrossStreet: "", + StreetName: "N Parham Rd", + EventCode: 214, + AlertCEventQuantifier: 0, + IsBlocking: true, + Timestamp: 1579419000000, + }, // older + } + wayid2NodeIDsMapping := wayID2NodeIDs{ 1204020274: []int64{123456789001, 123456789002, 123456789003, 123456789004}, 100663296: []int64{123456789011, 123456789012, 123456789003}, 19446119: []int64{123456789021, 123456789002, 123456789023, 123456789024, 123456789025, 123456789026}, 1204020275: []int64{123456789031, 123456789032, 123456789033, 123456789034}, 100643296: []int64{123456789041, 123456789042, 123456789043}, + 111111111: []int64{11111111101, 1111111102, 1111111103}, } cache := New() cacheWithEdgeIndexing := NewWithEdgeIndexing(wayid2NodeIDsMapping) // update - cache.Update(newIncidentsResponses(presetIncidents, proxy.Action_UPDATE)) - cacheWithEdgeIndexing.Update(newIncidentsResponses(presetIncidents, proxy.Action_UPDATE)) + cache.Update(newIncidentsResponses(presetIncidents, trafficproxy.Action_UPDATE)) + cacheWithEdgeIndexing.Update(newIncidentsResponses(presetIncidents, trafficproxy.Action_UPDATE)) expectIncidentsCount := 2 if cache.Count() != expectIncidentsCount || cacheWithEdgeIndexing.Count() != expectIncidentsCount { t.Errorf("expect cached incidents count %d but got %d,%d", expectIncidentsCount, cache.Count(), cacheWithEdgeIndexing.Count()) @@ -116,19 +153,51 @@ func TestIncidentsCache(t *testing.T) { } } + // update + cache.Update(newIncidentsResponses(updateIncidents, trafficproxy.Action_UPDATE)) + cacheWithEdgeIndexing.Update(newIncidentsResponses(updateIncidents, trafficproxy.Action_UPDATE)) + if cache.Count() != expectIncidentsCount || cacheWithEdgeIndexing.Count() != expectIncidentsCount { + t.Errorf("expect cached incidents count %d but got %d,%d", expectIncidentsCount, cache.Count(), cacheWithEdgeIndexing.Count()) + } + expectAffectedWaysCount = 5 // only store blocked incidents + if cache.AffectedWaysCount() != expectAffectedWaysCount || cacheWithEdgeIndexing.AffectedWaysCount() != expectAffectedWaysCount { + t.Errorf("expect cached incidents affect ways count %d but got %d,%d", expectAffectedWaysCount, cache.AffectedWaysCount(), cacheWithEdgeIndexing.AffectedWaysCount()) + } + expectAffectedEdgesCount = 14 + if cacheWithEdgeIndexing.AffectedEdgesCount() != expectAffectedEdgesCount { + t.Errorf("expect cached incidents affect edges count %d but got %d", expectAffectedEdgesCount, cacheWithEdgeIndexing.AffectedEdgesCount()) + } + + // query expect sucess + inCacheWayIDs = []int64{111111111} // only check the updated one + for _, wayID := range inCacheWayIDs { + if !cache.WayBlockedByIncident(wayID) || !cacheWithEdgeIndexing.WayBlockedByIncident(wayID) { + t.Errorf("wayID %d, expect blocked by incident but not", wayID) + } + edges := wayid2NodeIDsMapping.WayID2Edges(wayID) + for _, e := range edges { + if !cacheWithEdgeIndexing.EdgeBlockedByIncident(e) { + t.Errorf("edge %v, expect blocked by incident but not", e) + } + } + if b, i := cacheWithEdgeIndexing.EdgesBlockedByIncidents(edges); !b || i != 0 { + t.Errorf("edges %v, expect blocked by incidents but not", edges) + } + } + // delete deleteIncidents := presetIncidents[:2] - cache.Update(newIncidentsResponses(deleteIncidents, proxy.Action_DELETE)) - cacheWithEdgeIndexing.Update(newIncidentsResponses(deleteIncidents, proxy.Action_DELETE)) + cache.Update(newIncidentsResponses(deleteIncidents, trafficproxy.Action_DELETE)) + cacheWithEdgeIndexing.Update(newIncidentsResponses(deleteIncidents, trafficproxy.Action_DELETE)) expectIncidentsCount = 1 if cache.Count() != expectIncidentsCount || cacheWithEdgeIndexing.Count() != expectIncidentsCount { t.Errorf("expect after delete, cached incidents count %d but got %d,%d", expectIncidentsCount, cache.Count(), cacheWithEdgeIndexing.Count()) } - expectAffectedWaysCount = 3 // only store blocked incidents + expectAffectedWaysCount = 4 // only store blocked incidents if cache.AffectedWaysCount() != expectAffectedWaysCount || cacheWithEdgeIndexing.AffectedWaysCount() != expectAffectedWaysCount { t.Errorf("expect cached incidents affect ways count %d but got %d,%d", expectAffectedWaysCount, cache.AffectedWaysCount(), cacheWithEdgeIndexing.AffectedWaysCount()) } - expectAffectedEdgesCount = 7 + expectAffectedEdgesCount = 9 if cacheWithEdgeIndexing.AffectedEdgesCount() != expectAffectedEdgesCount { t.Errorf("expect cached incidents affect edges count %d but got %d", expectAffectedEdgesCount, cacheWithEdgeIndexing.AffectedEdgesCount()) } @@ -148,11 +217,11 @@ func TestIncidentsCache(t *testing.T) { } -func newIncidentsResponses(incidents []*proxy.Incident, action proxy.Action) []*proxy.IncidentResponse { +func newIncidentsResponses(incidents []*trafficproxy.Incident, action trafficproxy.Action) []*trafficproxy.IncidentResponse { - incidentsResponses := []*proxy.IncidentResponse{} + incidentsResponses := []*trafficproxy.IncidentResponse{} for _, incident := range incidents { - incidentsResponses = append(incidentsResponses, &proxy.IncidentResponse{Incident: incident, Action: action, XXX_NoUnkeyedLiteral: struct{}{}, XXX_unrecognized: nil, XXX_sizecache: 0}) + incidentsResponses = append(incidentsResponses, &trafficproxy.IncidentResponse{Incident: incident, Action: action, XXX_NoUnkeyedLiteral: struct{}{}, XXX_unrecognized: nil, XXX_sizecache: 0}) } return incidentsResponses } diff --git a/integration/trafficcache/incidentscache/interface.go b/integration/trafficcache/incidentscache/interface.go index a4e83c04f08..088a58d7bef 100644 --- a/integration/trafficcache/incidentscache/interface.go +++ b/integration/trafficcache/incidentscache/interface.go @@ -5,7 +5,7 @@ import ( "sync" "github.com/Telenav/osrm-backend/integration/graph" - proxy "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" + "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" "github.com/Telenav/osrm-backend/integration/wayidsmap" "github.com/golang/glog" ) @@ -13,7 +13,7 @@ import ( // Cache stores incidents in memory. type Cache struct { m sync.RWMutex - incidents map[string]*proxy.Incident + incidents map[string]*trafficproxy.Incident wayIDBlockedByIncidentIDs map[int64]map[string]struct{} // wayID -> IncidentID,IncidentID,... // optional @@ -25,7 +25,7 @@ type Cache struct { func New() *Cache { return &Cache{ sync.RWMutex{}, - map[string]*proxy.Incident{}, + map[string]*trafficproxy.Incident{}, map[int64]map[string]struct{}{}, nil, nil, @@ -41,7 +41,7 @@ func NewWithEdgeIndexing(wayID2Edges wayidsmap.Way2Edges) *Cache { return &Cache{ sync.RWMutex{}, - map[string]*proxy.Incident{}, + map[string]*trafficproxy.Incident{}, map[int64]map[string]struct{}{}, wayID2Edges, map[graph.Edge]map[string]struct{}{}, @@ -53,7 +53,7 @@ func (c *Cache) Clear() { c.m.Lock() defer c.m.Unlock() - c.incidents = map[string]*proxy.Incident{} + c.incidents = map[string]*trafficproxy.Incident{} c.wayIDBlockedByIncidentIDs = map[int64]map[string]struct{}{} if c.edgeBlockedByIncidentIDs != nil { c.edgeBlockedByIncidentIDs = map[graph.Edge]map[string]struct{}{} @@ -127,7 +127,7 @@ func (c *Cache) AffectedEdgesCount() int { } // Update updates incidents in cache. -func (c *Cache) Update(incidentResponses []*proxy.IncidentResponse) { +func (c *Cache) Update(incidentResponses []*trafficproxy.IncidentResponse) { if len(incidentResponses) == 0 { return } @@ -136,10 +136,10 @@ func (c *Cache) Update(incidentResponses []*proxy.IncidentResponse) { defer c.m.Unlock() for _, incidentResp := range incidentResponses { - if incidentResp.Action == proxy.Action_UPDATE || incidentResp.Action == proxy.Action_ADD { //TODO: Action_ADD will be removed soon + if incidentResp.Action == trafficproxy.Action_UPDATE { c.unsafeUpdate(incidentResp.Incident) continue - } else if incidentResp.Action == proxy.Action_DELETE { + } else if incidentResp.Action == trafficproxy.Action_DELETE { c.unsafeDelete(incidentResp.Incident) continue } diff --git a/integration/trafficcache/incidentscache/update.go b/integration/trafficcache/incidentscache/update.go index 19a6102a115..8a7a47ae7c3 100644 --- a/integration/trafficcache/incidentscache/update.go +++ b/integration/trafficcache/incidentscache/update.go @@ -1,16 +1,16 @@ package incidentscache import ( - proxy "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" + "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" "github.com/golang/glog" ) -func (c *Cache) unsafeUpdate(incident *proxy.Incident) { +func (c *Cache) unsafeUpdate(incident *trafficproxy.Incident) { if incident == nil { glog.Fatal("empty incident") return } - if len(incident.AffectedWayIds) == 0 { + if len(incident.AffectedWayIDs) == 0 { glog.Warningf("empty AffectedWayIds in incident %v", incident) return } @@ -18,33 +18,36 @@ func (c *Cache) unsafeUpdate(incident *proxy.Incident) { return // we only take care of blocking incidents } - incidentInCache, foundIncidentInCache := c.incidents[incident.IncidentId] + incidentInCache, foundIncidentInCache := c.incidents[incident.IncidentID] if foundIncidentInCache { - c.unsafeDeleteWayIDsBlockedByIncidentID(incidentInCache.AffectedWayIds, incidentInCache.IncidentId) + if incidentInCache.Timestamp > incident.Timestamp { + return // ignore older incident + } + c.unsafeDeleteWayIDsBlockedByIncidentID(incidentInCache.AffectedWayIDs, incidentInCache.IncidentID) if c.wayID2Edges != nil && c.edgeBlockedByIncidentIDs != nil { - c.unsafeDeleteEdgesBlockedByIncidentID(incidentInCache.AffectedWayIds, incidentInCache.IncidentId) + c.unsafeDeleteEdgesBlockedByIncidentID(incidentInCache.AffectedWayIDs, incidentInCache.IncidentID) } } - c.incidents[incident.IncidentId] = incident - c.unsafeAddWayIDsBlockedByIncidentID(incident.AffectedWayIds, incident.IncidentId) + c.incidents[incident.IncidentID] = incident + c.unsafeAddWayIDsBlockedByIncidentID(incident.AffectedWayIDs, incident.IncidentID) if c.wayID2Edges != nil && c.edgeBlockedByIncidentIDs != nil { - c.unsafeAddEdgesBlockedByIncidentID(incident.AffectedWayIds, incident.IncidentId) + c.unsafeAddEdgesBlockedByIncidentID(incident.AffectedWayIDs, incident.IncidentID) } } -func (c *Cache) unsafeDelete(incident *proxy.Incident) { +func (c *Cache) unsafeDelete(incident *trafficproxy.Incident) { if incident == nil { glog.Fatal("empty incident") return } - incidentInCache, foundIncidentInCache := c.incidents[incident.IncidentId] + incidentInCache, foundIncidentInCache := c.incidents[incident.IncidentID] if foundIncidentInCache { - c.unsafeDeleteWayIDsBlockedByIncidentID(incidentInCache.AffectedWayIds, incidentInCache.IncidentId) + c.unsafeDeleteWayIDsBlockedByIncidentID(incidentInCache.AffectedWayIDs, incidentInCache.IncidentID) if c.wayID2Edges != nil && c.edgeBlockedByIncidentIDs != nil { - c.unsafeDeleteEdgesBlockedByIncidentID(incidentInCache.AffectedWayIds, incidentInCache.IncidentId) + c.unsafeDeleteEdgesBlockedByIncidentID(incidentInCache.AffectedWayIDs, incidentInCache.IncidentID) } - delete(c.incidents, incident.IncidentId) + delete(c.incidents, incident.IncidentID) } } diff --git a/integration/trafficcache/querytraffic/interface.go b/integration/trafficcache/querytraffic/interface.go index f96f7c32cf4..b83743449a4 100644 --- a/integration/trafficcache/querytraffic/interface.go +++ b/integration/trafficcache/querytraffic/interface.go @@ -1,12 +1,12 @@ package querytraffic import ( - proxy "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" + "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" ) // Inquirer defines interfaces for querying traffic flows and incidents. type Inquirer interface { - QueryFlow(int64) *proxy.Flow + QueryFlow(int64) *trafficproxy.Flow BlockedByIncident(int64) bool } diff --git a/integration/trafficcache/querytrafficbyedge/interface.go b/integration/trafficcache/querytrafficbyedge/interface.go index bc4d40c64d2..bf103aeac63 100644 --- a/integration/trafficcache/querytrafficbyedge/interface.go +++ b/integration/trafficcache/querytrafficbyedge/interface.go @@ -2,13 +2,13 @@ package querytrafficbyedge import ( "github.com/Telenav/osrm-backend/integration/graph" - proxy "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" + "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" ) // Inquirer defines interfaces for querying traffic flows and incidents. type Inquirer interface { - QueryFlow(graph.Edge) *proxy.Flow - QueryFlows([]graph.Edge) []*proxy.Flow + QueryFlow(graph.Edge) *trafficproxy.Flow + QueryFlows([]graph.Edge) []*trafficproxy.Flow EdgeBlockedByIncident(graph.Edge) bool diff --git a/integration/trafficcache/trafficcache/cache.go b/integration/trafficcache/trafficcache/cache.go index 90fa72e2876..407f179c42d 100644 --- a/integration/trafficcache/trafficcache/cache.go +++ b/integration/trafficcache/trafficcache/cache.go @@ -1,7 +1,7 @@ package trafficcache import ( - proxy "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" + "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" "github.com/Telenav/osrm-backend/integration/trafficcache/flowscache" "github.com/Telenav/osrm-backend/integration/trafficcache/incidentscache" "github.com/golang/glog" @@ -29,14 +29,14 @@ func (c *Cache) Clear() { } // Eat implements trafficeater.Eater inteface. -func (c *Cache) Eat(r proxy.TrafficResponse) { +func (c *Cache) Eat(r trafficproxy.TrafficResponse) { glog.V(1).Infof("new traffic for cache, flows: %d, incidents: %d", len(r.FlowResponses), len(r.IncidentResponses)) c.Flows.Update(r.FlowResponses) c.Incidents.Update(r.IncidentResponses) } // QueryFlow returns Live Traffic Flow if exist. -func (c *Cache) QueryFlow(wayID int64) *proxy.Flow { +func (c *Cache) QueryFlow(wayID int64) *trafficproxy.Flow { return c.Flows.Query(wayID) } diff --git a/integration/trafficcache/trafficcacheindexedbyedge/cache.go b/integration/trafficcache/trafficcacheindexedbyedge/cache.go index 684deb62ebf..a603911455f 100644 --- a/integration/trafficcache/trafficcacheindexedbyedge/cache.go +++ b/integration/trafficcache/trafficcacheindexedbyedge/cache.go @@ -2,7 +2,7 @@ package trafficcacheindexedbyedge import ( "github.com/Telenav/osrm-backend/integration/graph" - proxy "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" + "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" "github.com/Telenav/osrm-backend/integration/trafficcache/flowscacheindexedbyedge" "github.com/Telenav/osrm-backend/integration/trafficcache/incidentscache" "github.com/Telenav/osrm-backend/integration/wayidsmap" @@ -31,19 +31,19 @@ func (c *Cache) Clear() { } // Eat implements trafficeater.Eater inteface. -func (c *Cache) Eat(r proxy.TrafficResponse) { +func (c *Cache) Eat(r trafficproxy.TrafficResponse) { glog.V(1).Infof("new traffic for cache, flows: %d, incidents: %d", len(r.FlowResponses), len(r.IncidentResponses)) c.Flows.Update(r.FlowResponses) c.Incidents.Update(r.IncidentResponses) } // QueryFlow returns Live Traffic Flow if exist. -func (c *Cache) QueryFlow(e graph.Edge) *proxy.Flow { +func (c *Cache) QueryFlow(e graph.Edge) *trafficproxy.Flow { return c.Flows.QueryByEdge(e) } // QueryFlows returns Live Traffic Flows if exist. -func (c *Cache) QueryFlows(e []graph.Edge) []*proxy.Flow { +func (c *Cache) QueryFlows(e []graph.Edge) []*trafficproxy.Flow { return c.Flows.QueryByEdges(e) } diff --git a/integration/trafficdumper/dump.go b/integration/trafficdumper/dump.go index 9c80db49d7d..b94f8aa82cd 100644 --- a/integration/trafficdumper/dump.go +++ b/integration/trafficdumper/dump.go @@ -6,12 +6,12 @@ import ( "os" "time" - proxy "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" + "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" "github.com/golang/glog" ) // DumpFlowResponses dump flows to file/stdout. -func (h Handler) DumpFlowResponses(flowResponses []*proxy.FlowResponse) { +func (h Handler) DumpFlowResponses(flowResponses []*trafficproxy.FlowResponse) { if len(flowResponses) == 0 { return } @@ -53,7 +53,7 @@ func (h Handler) DumpFlowResponses(flowResponses []*proxy.FlowResponse) { } // DumpIncidentResponses dump incidents to file/stdout. -func (h Handler) DumpIncidentResponses(incidentResponses []*proxy.IncidentResponse) { +func (h Handler) DumpIncidentResponses(incidentResponses []*trafficproxy.IncidentResponse) { if len(incidentResponses) == 0 { return } diff --git a/integration/trafficdumper/dump_streaming_delta.go b/integration/trafficdumper/dump_streaming_delta.go index 49c4fcf3ccf..51bfb978ac2 100644 --- a/integration/trafficdumper/dump_streaming_delta.go +++ b/integration/trafficdumper/dump_streaming_delta.go @@ -4,16 +4,17 @@ import ( "fmt" "time" - proxy "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" + "github.com/Telenav/osrm-backend/integration/pkg/trafficproxy" "github.com/golang/glog" ) // DumpStreamingDelta dumps traffic response from streaming delta channel. -func DumpStreamingDelta(responseChan <-chan proxy.TrafficResponse) { +func DumpStreamingDelta(responseChan <-chan trafficproxy.TrafficResponse) { h := NewHandler() startTime := time.Now() - trafficResponse := proxy.TrafficResponse{} + trafficResponse := trafficproxy.TrafficResponse{} + var totalFlowsCount, totalIncidentsCount uint64 for { resp, ok := <-responseChan @@ -27,13 +28,16 @@ func DumpStreamingDelta(responseChan <-chan proxy.TrafficResponse) { } // handle per interval - glog.Infof("handling flows,incidents(%d,%d) from streaming delta, interval %f seconds", - len(trafficResponse.FlowResponses), len(trafficResponse.IncidentResponses), timeInterval.Seconds()) + totalFlowsCount += uint64(len(trafficResponse.FlowResponses)) + totalIncidentsCount += uint64(len(trafficResponse.IncidentResponses)) + glog.Infof("handling flows,incidents(%d,%d) from streaming delta, interval %f seconds. Totally handled flows,incidents(%d,%d) so far.", + len(trafficResponse.FlowResponses), len(trafficResponse.IncidentResponses), timeInterval.Seconds(), totalFlowsCount, totalIncidentsCount) if h.writeToFile && h.streamingDeltaSplitDumpFiles { h.updateDumpFileNamePrefix() } h.DumpFlowResponses(trafficResponse.FlowResponses) h.DumpIncidentResponses(trafficResponse.IncidentResponses) + trafficResponse = trafficproxy.TrafficResponse{} // clean up if !ok { // streaming delta channel no longer available, break after handling to make sure cached data processing. break diff --git a/integration/proxy.proto b/integration/trafficproxy.proto similarity index 61% rename from integration/proxy.proto rename to integration/trafficproxy.proto index d11b40295ab..8f5cefa6311 100644 --- a/integration/proxy.proto +++ b/integration/trafficproxy.proto @@ -1,6 +1,8 @@ syntax = "proto3"; -package proxy; +package trafficproxy; + +option cc_enable_arenas = true; service TrafficService { rpc GetTrafficData(TrafficRequest) returns (stream TrafficResponse) {} @@ -12,15 +14,20 @@ message TrafficRequest { oneof request_oneof { TrafficAllRequest trafficAllRequest = 3; - TrafficWayIdsRequest trafficWayIdsRequest = 4; + TrafficWayIDsRequest trafficWayIDsRequest = 4; TrafficStreamingDeltaRequest trafficStreamingDeltaRequest = 5; } + + // timestamp of expected traffic data, number of milliseconds since the Epoch. + // 0 means request for current(live) traffic data, otherwise request for archived traffic data. + // For archived traffic data, only allows trafficAllRequest and trafficWayIDsRequest. + int64 timestamp = 6; } message TrafficAllRequest {} -message TrafficWayIdsRequest { - repeated sint64 wayIds = 1; // positive means forward, negative means backward +message TrafficWayIDsRequest { + repeated sint64 wayIDs = 1; // positive means forward, negative means backward } message TrafficStreamingDeltaRequest { @@ -41,6 +48,7 @@ message TrafficSource { string region = 1; string trafficProvider = 2; string mapProvider = 3; + repeated string subregion = 4; // leave empty if requires full region data } enum TrafficType { @@ -51,12 +59,14 @@ enum TrafficType { message FlowResponse { Flow flow = 1; Action action = 2; + string trasctionID = 3; // unique ID for tracking the Flow from provider to end user } message Flow { - sint64 wayId = 1; // positive means forward, negative means backward - float speed = 2; // always >= 0 - TrafficLevel trafficLevel = 3; + sint64 wayID = 1; // positive means forward, negative means backward + float speed = 2; // unit: meter per second. always >= 0 + TrafficLevel trafficLevel = 3; // `trafficLevel==CLOSED` means blocking Flow + int64 timestamp = 4; // received timestamp from provider, number of milliseconds since the Epoch } enum TrafficLevel { @@ -69,19 +79,19 @@ enum TrafficLevel { } enum Action { - ADD = 0; - UPDATE = 1; - DELETE = 2; + UPDATE = 0; + DELETE = 1; } message IncidentResponse { Incident incident = 1; Action action = 2; + string trasctionID = 3; // unique ID for tracking the Incident from provider to end user } message Incident { - string incidentId = 1; // unique id of this incident - repeated sint64 affectedWayIds = 2; // positive means forward, negative means backward + string incidentID = 1; // unique ID of this incident + repeated sint64 affectedWayIDs = 2; // positive means forward, negative means backward IncidentType incidentType = 3; IncidentSeverity incidentSeverity = 4; Location incidentLocation = 5; @@ -92,6 +102,7 @@ message Incident { int32 eventCode = 10; int32 alertCEventQuantifier = 11; bool isBlocking = 12; + int64 timestamp = 13; // received timestamp from provider, number of milliseconds since the Epoch } enum IncidentType {