Skip to content

Commit

Permalink
[ML] Multiphase progress reporting for data frame analyses (elastic#1179
Browse files Browse the repository at this point in the history
)
  • Loading branch information
tveasey committed May 4, 2020
1 parent 6929c7c commit 389164b
Show file tree
Hide file tree
Showing 19 changed files with 449 additions and 241 deletions.
1 change: 1 addition & 0 deletions docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
operations. (See {ml-pull}1142[#1142].)
* Fix spurious anomalies for count and sum functions after no data are received for long
periods of time. (See {ml-pull}1158[#1158].)
* Break progress reporting of data frame analyses into multiple phases. (See {ml-pull}1179[#1179].)

== {es} version 7.8.0

Expand Down
34 changes: 26 additions & 8 deletions include/api/CDataFrameAnalysisInstrumentation.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <atomic>
#include <cstdint>
#include <memory>
#include <mutex>
#include <unordered_map>

namespace ml {
Expand Down Expand Up @@ -54,6 +55,11 @@ class API_EXPORT CDataFrameAnalysisInstrumentation
//! Adds \p delta to the memory usage statistics.
void updateMemoryUsage(std::int64_t delta) override;

//! Start progress monitoring for \p phase.
//!
//! \note This resets the current progress to zero.
void startNewProgressMonitoredTask(const std::string& task) override;

//! This adds \p fractionalProgress to the current progress.
//!
//! \note The caller should try to ensure that the sum of the values added
Expand All @@ -64,6 +70,9 @@ class API_EXPORT CDataFrameAnalysisInstrumentation
//! and typically this would be called significantly less frequently.
void updateProgress(double fractionalProgress) override;

//! Reset variables related to the job progress.
void resetProgress();

//! Record that the analysis is complete.
void setToFinished();

Expand All @@ -74,13 +83,14 @@ class API_EXPORT CDataFrameAnalysisInstrumentation
//! of the proportion of total work complete for a single run.
double progress() const;

//! Reset variables related to the job progress.
void resetProgress();
//! Start polling and writing progress updates.
//!
//! \note This doesn't return until setToFinished is called.
void monitorProgress();

//! Trigger the next step of the job. This will initiate writing the job state
//! to the results pipe.
//! \todo use \p phase to tag different phases of the analysis job.
void nextStep(const std::string& phase = "") override;
//! Flush then reinitialize the instrumentation data. This will trigger
//! writing them to the results pipe.
void flush(const std::string& tag = "") override;

//! \return The peak memory usage.
std::int64_t memory() const;
Expand All @@ -97,15 +107,23 @@ class API_EXPORT CDataFrameAnalysisInstrumentation
TWriter* writer();

private:
void writeMemory(std::int64_t timestamp);
static const std::string NO_TASK;

private:
int percentageProgress() const;
virtual void writeAnalysisStats(std::int64_t timestamp) = 0;
virtual void writeState();
void writeMemoryAndAnalysisStats();
// Note this is thread safe.
void writeProgress(const std::string& task, int progress);
void writeMemory(std::int64_t timestamp);

private:
std::string m_JobId;
std::string m_ProgressMonitoredTask;
std::atomic_bool m_Finished;
std::atomic_size_t m_FractionalProgress;
std::atomic<std::int64_t> m_Memory;
static std::mutex ms_ProgressMutex;
TWriterUPtr m_Writer;
};

Expand Down
3 changes: 0 additions & 3 deletions include/api/CDataFrameAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ class API_EXPORT CDataFrameAnalyzer {
bool handleControlMessage(const TStrVec& fieldValues);
void captureFieldNames(const TStrVec& fieldNames);
void addRowToDataFrame(const TStrVec& fieldValues);
void monitorProgress(const CDataFrameAnalysisRunner& analysis,
core::CRapidJsonConcurrentLineWriter& writer) const;
void writeProgress(int progress, core::CRapidJsonConcurrentLineWriter& writer) const;
void writeResultsOf(const CDataFrameAnalysisRunner& analysis,
core::CRapidJsonConcurrentLineWriter& writer) const;

Expand Down
30 changes: 24 additions & 6 deletions include/maths/CBoostedTreeFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ class MATHS_EXPORT CBoostedTreeFactory final {
using TLossFunctionUPtr = CBoostedTree::TLossFunctionUPtr;
using TAnalysisInstrumentationPtr = CDataFrameAnalysisInstrumentationInterface*;

public:
//! \name Instrumentation Phases
//@{
static const std::string FEATURE_SELECTION;
static const std::string COARSE_PARAMETER_SEARCH;
static const std::string FINE_TUNING_PARAMETERS;
static const std::string FINAL_TRAINING;
//@}

public:
//! Construct a boosted tree object from parameters.
static CBoostedTreeFactory constructFromParameters(std::size_t numberThreads,
Expand Down Expand Up @@ -189,14 +198,23 @@ class MATHS_EXPORT CBoostedTreeFactory final {
//! Get the number of hyperparameter tuning rounds to use.
std::size_t numberHyperparameterTuningRounds() const;

//! Setup monitoring for training progress.
void initializeTrainingProgressMonitoring(const core::CDataFrame& frame);
//! Start progress monitoring feature selection.
void startProgressMonitoringFeatureSelection();

//! Refresh progress monitoring after restoring from saved training state.
void resumeRestoredTrainingProgressMonitoring();
//! Start progress monitoring initializeHyperparameters.
void startProgressMonitoringInitializeHyperparameters(const core::CDataFrame& frame);

//! The total number of progress steps used in the main loop.
std::size_t mainLoopNumberSteps(double eta) const;
//! Skip progress monitoring for feature selection if we've restarted part
//! way through training.
//!
//! \note This makes sure we output that this task is complete.
void skipProgressMonitoringFeatureSelection();

//! Skip progress monitoring for initializeHyperparameters if we've restarted
//! part way through training.
//!
//! \note This makes sure we output that this task is complete.
void skipProgressMonitoringInitializeHyperparameters();

//! The maximum number of trees to use in the hyperparameter optimisation loop.
std::size_t mainLoopMaximumNumberTrees(double eta) const;
Expand Down
6 changes: 6 additions & 0 deletions include/maths/CBoostedTreeImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,12 @@ class MATHS_EXPORT CBoostedTreeImpl final {
//! a good idea.
std::size_t maximumTreeSize(std::size_t numberRows) const;

//! Start monitoring fine tuning hyperparameters.
void startProgressMonitoringFineTuneHyperparameters();

//! Start monitoring the final model training.
void startProgressMonitoringFinalTrain();

//! Restore \p loss function pointer from the \p traverser.
static bool restoreLoss(TLossFunctionUPtr& loss, core::CStateRestoreTraverser& traverser);

Expand Down
36 changes: 27 additions & 9 deletions include/maths/CDataFrameAnalysisInstrumentationInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,19 @@ class MATHS_EXPORT CDataFrameAnalysisInstrumentationInterface {
public:
using TProgressCallback = std::function<void(double)>;
using TMemoryUsageCallback = std::function<void(std::int64_t)>;
using TStepCallback = std::function<void(const std::string&)>;
using TFlushCallback = std::function<void(const std::string&)>;

public:
virtual ~CDataFrameAnalysisInstrumentationInterface() = default;

//! Adds \p delta to the memory usage statistics.
virtual void updateMemoryUsage(std::int64_t delta) = 0;

//! Start progress monitoring for \p phase.
//!
//! \note This resets the current progress to zero.
virtual void startNewProgressMonitoredTask(const std::string& task) = 0;

//! This adds \p fractionalProgress to the current progress.
//!
//! \note The caller should try to ensure that the sum of the values added
Expand All @@ -40,25 +47,34 @@ class MATHS_EXPORT CDataFrameAnalysisInstrumentationInterface {
//! than 0.001. In fact, it is unlikely that such high resolution is needed
//! and typically this would be called significantly less frequently.
virtual void updateProgress(double fractionalProgress) = 0;
//! Trigger the next step of the job. This will initiate writing the job state
//! to the results pipe.
virtual void nextStep(const std::string& phase = "") = 0;

//! Flush then reinitialize the instrumentation data. This will trigger
//! writing them to the results pipe.
virtual void flush(const std::string& tag = "") = 0;

//! Factory for the updateProgress() callback function object.
TProgressCallback progressCallback() {
return [this](double fractionalProgress) {
this->updateProgress(fractionalProgress);
};
}

//! Factory for the updateMemoryUsage() callback function object.
TMemoryUsageCallback memoryUsageCallback() {
return [this](std::int64_t delta) { this->updateMemoryUsage(delta); };
}
//! Factory for the nextStep() callback function object.
TStepCallback stepCallback() {
return [this](const std::string& phase) { this->nextStep(phase); };

//! Factory for the flush() callback function object.
TFlushCallback flushCallback() {
return [this](const std::string& tag) { this->flush(tag); };
}
};

//! \brief Instrumentation interface for Outlier Detection jobs.
//!
//! DESCRIPTION:\n
//! This interface extends CDataFrameAnalysisInstrumentationInterface with a setters
//! for analysis parameters and elapsed time.
class MATHS_EXPORT CDataFrameOutliersInstrumentationInterface
: virtual public CDataFrameAnalysisInstrumentationInterface {
public:
Expand Down Expand Up @@ -130,8 +146,9 @@ class MATHS_EXPORT CDataFrameOutliersInstrumentationStub
: public CDataFrameOutliersInstrumentationInterface {
public:
void updateMemoryUsage(std::int64_t) override {}
void startNewProgressMonitoredTask(const std::string& /* task */) override {}
void updateProgress(double) override {}
void nextStep(const std::string& /* phase */) override {}
void flush(const std::string& /* tag */) override {}
void parameters(const maths::COutliers::SComputeParameters& /* parameters */) override {}
void elapsedTime(std::uint64_t /* time */) override {}
void featureInfluenceThreshold(double /* featureInfluenceThreshold */) override {}
Expand All @@ -142,8 +159,9 @@ class MATHS_EXPORT CDataFrameTrainBoostedTreeInstrumentationStub
: public CDataFrameTrainBoostedTreeInstrumentationInterface {
public:
void updateMemoryUsage(std::int64_t) override {}
void startNewProgressMonitoredTask(const std::string& /* task */) override {}
void updateProgress(double) override {}
void nextStep(const std::string& /* phase */) override {}
void flush(const std::string& /* tag */) override {}
void type(EStatsType /* type */) override {}
void iteration(std::size_t /* iteration */) override {}
void iterationTime(std::uint64_t /* delta */) override {}
Expand Down
5 changes: 5 additions & 0 deletions include/maths/CDataFrameCategoryEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ class MATHS_EXPORT CMakeDataFrameCategoryEncoder {
using TSizeVec = std::vector<std::size_t>;
using TOptionalDouble = boost::optional<double>;
using TEncodingUPtrVec = CDataFrameCategoryEncoder::TEncodingUPtrVec;
using TProgressCallback = std::function<void(double)>;

public:
//! The minimum number of training rows needed per feature used.
Expand Down Expand Up @@ -322,6 +323,9 @@ class MATHS_EXPORT CMakeDataFrameCategoryEncoder {
//! Set a mask of the columns to include.
CMakeDataFrameCategoryEncoder& columnMask(TSizeVec columnMask);

//! Set a callback to monitor progress.
CMakeDataFrameCategoryEncoder& progressCallback(TProgressCallback callback);

//! Make the encoding.
virtual TEncodingUPtrVec makeEncodings();

Expand Down Expand Up @@ -386,6 +390,7 @@ class MATHS_EXPORT CMakeDataFrameCategoryEncoder {
TDoubleVec m_EncodedColumnMics;
TSizeVec m_EncodedColumnInputColumnMap;
TSizeVec m_EncodedColumnEncodingMap;
TProgressCallback m_RecordProgress;
};
}
}
Expand Down
6 changes: 6 additions & 0 deletions include/maths/COutliers.h
Original file line number Diff line number Diff line change
Expand Up @@ -659,11 +659,17 @@ class MATHS_EXPORT COutliers : private core::CNonInstantiatable {
template<typename POINT>
using TAnnotatedPoint = CAnnotatedVector<POINT, std::size_t>;

//! \name Method Names
//@{
static const std::string LOF;
static const std::string LDOF;
static const std::string DISTANCE_KNN;
static const std::string TOTAL_DISTANCE_KNN;
static const std::string ENSEMBLE;
//@}

//! Instrumentation phase.
static const std::string COMPUTING_OUTLIERS;

//! The outlier detection methods which are available.
enum EMethod {
Expand Down
Loading

0 comments on commit 389164b

Please sign in to comment.