Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming experiment for ETA service #357

Open
CodeBear801 opened this issue Jun 17, 2020 · 1 comment
Open

Streaming experiment for ETA service #357

CodeBear801 opened this issue Jun 17, 2020 · 1 comment
Labels
Ideas Ideas for long-term discussion InProgress Currently working on it

Comments

@CodeBear801
Copy link

CodeBear801 commented Jun 17, 2020

subtask of #355

To be summary:

  • Experiment with Google products(dataflow, pubsub, data studio, bigquery, cloud bigtable)
  • Experiment with open source stack(spark/flink, kafka, ELK, mongoDB)

More info

@CodeBear801 CodeBear801 added Ideas Ideas for long-term discussion InProgress Currently working on it labels Jun 17, 2020
@CodeBear801
Copy link
Author

CodeBear801 commented Jun 22, 2020

Draft streaming system diagram, related with #356 (comment)

Context diagram

image
(click for large image)

  • Input is user's moving event, the ideal format for analysis and machine learning is
trace_id, userid, start_position, end_position, duration, distance, osrm_legs, avg_speed, osrm_distance, osrm_duration, osrm_edge_list, spatial_index_cell_list...

image

For how the output to be used could go to #356 (comment)

  • But wait for entire trip to be finished could take for a long time, and single gps points is not helpful for ETA(might be a noise point), what we need is partial trip continuously feed into system for analytics and training real time models

Proposal:
image

Another proposal:
image

  • Partial trip requirements

    • trip detection, similar to session detection, need to have the ability to group user's gps trace to a unique id at running time
    • partial trip analysis, map matching and then use OSRM generate guidance information
    • gps noise filter
    • spatial index assign for point list
  • Condition of generating partial trip is adjustable

    • aggregate x minutes for active user (5 minutes or less)
    • aggregate y number of points(500, 1000)
    • there should be some overlap between partial trip

Container diagram

image

  • event bus: kafka, kinesis or managed service like pub/sub should all works, prefer to kafka
  • streaming system: apache beam or apache flink or managed service, I think either of them works for expected logic. Python vs Scala?!
  • The purpose of Analytic DB is
    • Experiment data during feature engineering
    • Tell live status of input data, for example, we could query for how many users pass edge_1 or cell_1 for past two hours in seconds of query time(column data)

Component diagram

image

Aggregator

Aggregator

  • Assign trip ID for event point

  • group by session window, trigger by number of gps counts or time
    image

  • It generates a group of points with trip ID

Unique ID assigner

  • Similar to the idea of session, we will try to group as many as gps trace into single trip until
    • get a stop event(if there is any)
    • hasn't receieve new points for a certain time window, new gps point event will be assign with new trip ID
  • Basically, we just want a distribute unique id, there are multiple ways, could be generated via redis or adopt idea of snowflake

Trip Data Enhancer(Feature engineering)

Draft flow:

  • Load data from event bus
  • task: on compute route features(via OSRM or any navigation engine)
    • mapmatcher, generate a list of edges, data cleaning
    • generate route response for number of legs, route distance, attributes, etc
  • task: on compute spatical index
  • task: on compute other features
  • join with original data(or generate as a separate table, join at machine learning step)

Why separate Aggregator and Trip Data Enhancer

  • They could be put into single streaming pipeline
  • But consider we might directly accept historical trip gps trace and apply Trip Data Enhancer on them, it might be better to separate them as different component

Notes:

  • I think either streaming infrastructure of Apache Beam or Apache Flink works, each task is an interface represent actions(which usually a remote service calling) and streaming framework with chain them together
  • For each partial trace, group by tripID, default trigger should work
  • For how to trigger machine learning component, either via timer(each several minutes) or/and with a trigger of certain amount of data(like 1000 accumulate trips)

Minimal Visible Product

Road Trip Enhancer(V1)

Don't use additional framework, just handling input and generate expected output based on 1 or 2 remote service, working on single node.

  • input: trip contains gps points
trip_id_123, point1, point2, point3, ....
trip_id_124, point1, point2, point3, ....
trip_id_125, point1, point2, point3, ....
  • Main Logic: Calling several remote service and generate features
inputRecord = Retrieve data from message queue
mapMatchedFuture = mapMatcher(inputRecord)
routeResponseFuture = router(mapMatchedFuture)
...
join result
publish result to message queue

Road Trip Enhancer(V2)

Use streaming calculation framework to replace internal logic of V1, keep input and output part

on(loader)
.on(generateFeature1)
.on(generateFeature2)
...
.on(publish)

The reason of using streaming framework is

  • scale, be able to scale to multiple work nodes
  • error handling

Aggregator

Aggregator mainly used to achieve real time goal, for the beginning we could used batched result for following components until we have clear understanding of this components.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Ideas Ideas for long-term discussion InProgress Currently working on it
Projects
None yet
Development

No branches or pull requests

1 participant