Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Data frame progress is not reporting for continuous #44557

Closed
sophiec20 opened this issue Jul 18, 2019 · 3 comments · Fixed by #45361
Closed

[ML] Data frame progress is not reporting for continuous #44557

sophiec20 opened this issue Jul 18, 2019 · 3 comments · Fixed by #45361
Assignees
Labels
:ml/Transform Transform :ml Machine learning v7.4.0

Comments

@sophiec20
Copy link
Contributor

sophiec20 commented Jul 18, 2019

Found in 7.3.0-SNAPSHOT { "build" : { "hash" : "a57a5c5", "date" : "2019-07-16T14:52:05.956252Z" },

Checkpoint progress is not being updated for checkpoints 1 and above, presumably since the introduction of page-by-page processing to avoid terms explosion. Progress is reported and updated for checkpoint 0.

This is related to #43767. We may wish to revisit how we can show indicative checkpoint progress for continuous data frames. For example, 20% progress at page 2 of 10 could be a good enough indicator.

In debug logging you can see the following

[2019-07-18T11:03:33,859][DEBUG][o.e.x.c.i.AsyncTwoPhaseIndexer] [node1] Beginning to index [df01], state: [STARTED]
[2019-07-18T11:03:34,944][DEBUG][o.e.x.d.c.DataFrameTransformsCheckpointService] [node1] Failed to retrieve source checkpoint for data frame [df01]
[2019-07-18T11:03:35,556][DEBUG][o.e.x.d.t.DataFrameTransformTask] [node1] Updating persistent state of transform [df01] to [{"task_state":"started","indexer_state":"indexing","position":{"indexer_position":{"clientip":"2.124.127.204"},"bucket_position":{"clientip":"2.122.228.0"}},"checkpoint":5,"progress":{"total_docs":32000,"docs_remaining":15692,"percent_complete":50.9625}}]
[2019-07-18T11:03:40,741][DEBUG][o.e.x.d.t.DataFrameTransformTask] [node1] Updating persistent state of transform [df01] to [{"task_state":"started","indexer_state":"indexing","position":{"indexer_position":{"clientip":"31.51.217.58"},"bucket_position":{"clientip":"31.49.140.121"}},"checkpoint":5,"progress":{"total_docs":32000,"docs_remaining":0,"percent_complete":100.0}}]
[2019-07-18T11:03:46,353][DEBUG][o.e.x.d.t.DataFrameTransformTask] [node1] Updating persistent state of transform [df01] to [{"task_state":"started","indexer_state":"indexing","position":{"indexer_position":{"clientip":"49.212.153.96"},"bucket_position":{"clientip":"46.252.71.2"}},"checkpoint":5,"progress":{"total_docs":32000,"docs_remaining":0,"percent_complete":100.0}}]
[2019-07-18T11:03:51,018][DEBUG][o.e.x.d.t.DataFrameTransformTask] [node1] Updating persistent state of transform [df01] to [{"task_state":"started","indexer_state":"indexing","position":{"indexer_position":{"clientip":"64.246.178.34"},"bucket_position":{"clientip":"64.246.161.190"}},"checkpoint":5,"progress":{"total_docs":32000,"docs_remaining":0,"percent_complete":100.0}}]
[2019-07-18T11:03:55,571][DEBUG][o.e.x.d.t.DataFrameTransformTask] [node1] Updating persistent state of transform [df01] to [{"task_state":"started","indexer_state":"indexing","position":{"indexer_position":{"clientip":"70.63.164.253"},"bucket_position":{"clientip":"70.29.234.38"}},"checkpoint":5,"progress":{"total_docs":32000,"docs_remaining":0,"percent_complete":100.0}}]
[2019-07-18T11:03:59,916][DEBUG][o.e.x.d.t.DataFrameTransformTask] [node1] Updating persistent state of transform [df01] to [{"task_state":"started","indexer_state":"indexing","position":{"indexer_position":{"clientip":"80.128.104.91"},"bucket_position":{"clientip":"80.87.25.79"}},"checkpoint":5,"progress":{"total_docs":32000,"docs_remaining":0,"percent_complete":100.0}}]
[2019-07-18T11:04:03,630][DEBUG][o.e.x.d.t.DataFrameTransformTask] [node1] Updating persistent state of transform [df01] to [{"task_state":"started","indexer_state":"indexing","position":{"indexer_position":{"clientip":"82.145.209.89"},"bucket_position":{"clientip":"82.132.245.233"}},"checkpoint":5,"progress":{"total_docs":32000,"docs_remaining":0,"percent_complete":100.0}}]
[2019-07-18T11:04:06,957][DEBUG][o.e.x.d.t.DataFrameTransformTask] [node1] Updating persistent state of transform [df01] to [{"task_state":"started","indexer_state":"indexing","position":{"indexer_position":{"clientip":"86.161.217.16"},"bucket_position":{"clientip":"86.161.54.214"}},"checkpoint":5,"progress":{"total_docs":32000,"docs_remaining":0,"percent_complete":100.0}}]
[2019-07-18T11:04:10,428][DEBUG][o.e.x.d.t.DataFrameTransformTask] [node1] Updating persistent state of transform [df01] to [{"task_state":"started","indexer_state":"indexing","position":{"indexer_position":{"clientip":"91.192.111.141"},"bucket_position":{"clientip":"91.125.129.31"}},"checkpoint":5,"progress":{"total_docs":32000,"docs_remaining":0,"percent_complete":100.0}}]
[2019-07-18T11:04:14,504][DEBUG][o.e.x.d.t.DataFrameTransformTask] [node1] Updating persistent state of transform [df01] to [{"task_state":"started","indexer_state":"indexing","position":{"indexer_position":{"clientip":"101.226.168.240"},"bucket_position":{"clientip":"101.226.166.216"}},"checkpoint":5,"progress":{"total_docs":32000,"docs_remaining":0,"percent_complete":100.0}}]
[2019-07-18T11:04:17,897][DEBUG][o.e.x.d.t.DataFrameTransformTask] [node1] Updating persistent state of transform [df01] to [{"task_state":"started","indexer_state":"indexing","position":{"indexer_position":{"clientip":"157.55.34.107"},"bucket_position":{"clientip":"157.55.33.249"}},"checkpoint":5,"progress":{"total_docs":32000,"docs_remaining":0,"percent_complete":100.0}}]
[2019-07-18T11:04:21,134][DEBUG][o.e.x.d.t.DataFrameTransformTask] [node1] Updating persistent state of transform [df01] to [{"task_state":"started","indexer_state":"indexing","position":{"indexer_position":{"clientip":"188.29.164.205"},"bucket_position":{"clientip":"188.29.37.197"}},"checkpoint":5,"progress":{"total_docs":32000,"docs_remaining":0,"percent_complete":100.0}}]
[2019-07-18T11:04:23,961][DEBUG][o.e.x.c.i.AsyncTwoPhaseIndexer] [node1] Finished indexing for job [df01], saving state and shutting down.
[2019-07-18T11:04:23,961][DEBUG][o.e.x.d.t.DataFrameTransformTask] [node1] Finished indexing for data frame transform [df01] checkpoint [5]
@sophiec20 sophiec20 added :ml Machine learning :ml/Transform Transform v7.4.0 labels Jul 18, 2019
@elasticmachine
Copy link
Collaborator

Pinging @elastic/ml-core

@benwtrent
Copy link
Member

benwtrent commented Jul 19, 2019

Solutions (!?¡¿)

Option 0

Don’t do regular progress for continuous

Once it is past the initial batch loading, checkpoints will be processed pretty quickly. It seems much more valuable to have "current checkpoint and when it started and total processed documents" type of information. This in conjunction with "average checkpoint processing time" and "average processed documents" makes the most sense to me. Having a progress bar that fills quickly without context does not help much with continuous.

Option 1

Calculate progress total documents incrementally along side the PARTIAL_RUN_IDENTIFY_CHANGES run state

Positives

  • fits in nicely with the new way changes are gathered
  • Low overhead

Negatives

  • Progress within a checkpoint is a continual “cat and mouse” game with the total documents increasing AND the processed documents increasing.
    • This will probably end up looking like the progress reaches a specific percentage (say %70), then drops to a lower percentage (%60), then increases again to a higher percentage (%80), and the cycle repeats.
    • We COULD get around this by making progress percentage NOT mean “progress of a checkpoint” but “progress of a specific page of a checkpoint”

Note: We don’t know the total number of changed terms, the total number of pages required to gather the changed terms nor the number of pages to process the aggs

implementation

  • Add a filtered query that updates the total docs after each PARTIAL_RUN_IDENTIFY_CHANGES run that utilizes the changed terms

Option 2:

Calculate all the changed terms and use them to gather total docs

Positives

  • It would be accurate and won’t suffer from the goal post moving

Negatives

  • This would require a composite aggregation lookup of the past data with the date filter.
    • This implies we will essentially be running query portion (only with the terms group_by) of the transform Three times per continuous checkpoint.
      • To get progress total docs
      • To get the changed buckets for the internal processing
      • To gather the actual pivot data + aggregations

implementation

For each composite aggregation page:

  • we do a query over the past data for the buckets that exist
  • Add the total_count from that terms query to our total docs to update

The summation of all the total_count values from each of the queries will provide all the docs that will be queried eventually in that checkpoint

Option 3

Change how we do this intermittent bucket gathering stuff so that ALL changed terms are gathered before we starting querying through and index data

Positives

  • we get total docs changed for free (almost)

Negatives

  • Yet another huge refactor of the internal data frame indexer logic
  • This requires also partitioning out the changed terms, I am sure that @hendrikmuhs thought of this solution while working out the way to handle terms explosion and did not utilize it for a pretty good reason.

implementation (unsure if this would work)

  • Run through all the pages built via PARTIAL_RUN_IDENTIFY_CHANGES at the start of the checkpoint
    • for each step when gathering the changes, execute a filtered query with those changed terms to gather the total docs (the summation of which will indicate how many documents need to be processed that checkpoint)
  • Paginate the changed bucket terms out to avoid the limit (paging has to equal the paging when gathering the buckets...I think)
  • Increment the processed bucket count per normal.

@tveasey
Copy link
Contributor

tveasey commented Jul 22, 2019

The more I think about the more I like Option 0. I'm just not convinced that reporting progress for the current checkpoint is needed in continuous mode: I don't see people sitting watching the progress so why go to the trouble of trying to get an accurate progress monitor.

The key thing you might want to know is if the current update is taking much longer than usual. But, as suggested, we can provide that information much more easily by gathering some simple statistics for checkpoints.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:ml/Transform Transform :ml Machine learning v7.4.0
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants