Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Multiphase progress reporting for data frame analyses #1179

Merged
merged 13 commits into from
May 2, 2020
1 change: 1 addition & 0 deletions docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
operations. (See {ml-pull}1142[#1142].)
* Fix spurious anomalies for count and sum functions after no data are received for long
periods of time. (See {ml-pull}1158[#1158].)
* Break progress reporting of data frame analyses into multiple phases. (See {ml-pull}1179[#1179].)

== {es} version 7.8.0

Expand Down
34 changes: 26 additions & 8 deletions include/api/CDataFrameAnalysisInstrumentation.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <atomic>
#include <cstdint>
#include <memory>
#include <mutex>
#include <unordered_map>

namespace ml {
Expand Down Expand Up @@ -54,6 +55,11 @@ class API_EXPORT CDataFrameAnalysisInstrumentation
//! Adds \p delta to the memory usage statistics.
void updateMemoryUsage(std::int64_t delta) override;

//! Start progress monitoring for \p phase.
//!
//! \note This resets the current progress to zero.
void startNewProgressMonitoredTask(const std::string& task) override;

//! This adds \p fractionalProgress to the current progress.
//!
//! \note The caller should try to ensure that the sum of the values added
Expand All @@ -64,6 +70,9 @@ class API_EXPORT CDataFrameAnalysisInstrumentation
//! and typically this would be called significantly less frequently.
void updateProgress(double fractionalProgress) override;

//! Reset variables related to the job progress.
void resetProgress();

//! Record that the analysis is complete.
void setToFinished();

Expand All @@ -74,13 +83,14 @@ class API_EXPORT CDataFrameAnalysisInstrumentation
//! of the proportion of total work complete for a single run.
double progress() const;

//! Reset variables related to the job progress.
void resetProgress();
//! Start polling and writing progress updates.
//!
//! \note This doesn't return until setToFinished is called.
void monitorProgress();

//! Trigger the next step of the job. This will initiate writing the job state
//! to the results pipe.
//! \todo use \p phase to tag different phases of the analysis job.
void nextStep(const std::string& phase = "") override;
//! Flush then reinitialize the instrumentation data. This will trigger
//! writing them to the results pipe.
void flush(const std::string& tag = "") override;

//! \return The peak memory usage.
std::int64_t memory() const;
Expand All @@ -97,15 +107,23 @@ class API_EXPORT CDataFrameAnalysisInstrumentation
TWriter* writer();

private:
void writeMemory(std::int64_t timestamp);
static const std::string NO_TASK;

private:
int percentageProgress() const;
virtual void writeAnalysisStats(std::int64_t timestamp) = 0;
virtual void writeState();
void writeMemoryAndAnalysisStats();
// Note this is thread safe.
void writeProgress(const std::string& task, int progress);
void writeMemory(std::int64_t timestamp);

private:
std::string m_JobId;
std::string m_ProgressMonitoredTask;
std::atomic_bool m_Finished;
std::atomic_size_t m_FractionalProgress;
std::atomic<std::int64_t> m_Memory;
static std::mutex ms_ProgressMutex;
TWriterUPtr m_Writer;
};

Expand Down
3 changes: 0 additions & 3 deletions include/api/CDataFrameAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ class API_EXPORT CDataFrameAnalyzer {
bool handleControlMessage(const TStrVec& fieldValues);
void captureFieldNames(const TStrVec& fieldNames);
void addRowToDataFrame(const TStrVec& fieldValues);
void monitorProgress(const CDataFrameAnalysisRunner& analysis,
core::CRapidJsonConcurrentLineWriter& writer) const;
void writeProgress(int progress, core::CRapidJsonConcurrentLineWriter& writer) const;
void writeResultsOf(const CDataFrameAnalysisRunner& analysis,
core::CRapidJsonConcurrentLineWriter& writer) const;

Expand Down
30 changes: 24 additions & 6 deletions include/maths/CBoostedTreeFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ class MATHS_EXPORT CBoostedTreeFactory final {
using TLossFunctionUPtr = CBoostedTree::TLossFunctionUPtr;
using TAnalysisInstrumentationPtr = CDataFrameAnalysisInstrumentationInterface*;

public:
//! \name Instrumentation Phases
//@{
static const std::string FEATURE_SELECTION;
static const std::string COARSE_PARAMETER_SEARCH;
static const std::string FINE_TUNING_PARAMETERS;
static const std::string FINAL_TRAINING;
//@}

public:
//! Construct a boosted tree object from parameters.
static CBoostedTreeFactory constructFromParameters(std::size_t numberThreads,
Expand Down Expand Up @@ -189,14 +198,23 @@ class MATHS_EXPORT CBoostedTreeFactory final {
//! Get the number of hyperparameter tuning rounds to use.
std::size_t numberHyperparameterTuningRounds() const;

//! Setup monitoring for training progress.
void initializeTrainingProgressMonitoring(const core::CDataFrame& frame);
//! Start progress monitoring feature selection.
void startProgressMonitoringFeatureSelection();

//! Refresh progress monitoring after restoring from saved training state.
void resumeRestoredTrainingProgressMonitoring();
//! Start progress monitoring initializeHyperparameters.
void startProgressMonitoringInitializeHyperparameters(const core::CDataFrame& frame);

//! The total number of progress steps used in the main loop.
std::size_t mainLoopNumberSteps(double eta) const;
//! Skip progress monitoring for feature selection if we've restarted part
//! way through training.
//!
//! \note This makes sure we output that this task is complete.
void skipProgressMonitoringFeatureSelection();

//! Skip progress monitoring for initializeHyperparameters if we've restarted
//! part way through training.
//!
//! \note This makes sure we output that this task is complete.
void skipProgressMonitoringInitializeHyperparameters();

//! The maximum number of trees to use in the hyperparameter optimisation loop.
std::size_t mainLoopMaximumNumberTrees(double eta) const;
Expand Down
6 changes: 6 additions & 0 deletions include/maths/CBoostedTreeImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,12 @@ class MATHS_EXPORT CBoostedTreeImpl final {
//! a good idea.
std::size_t maximumTreeSize(std::size_t numberRows) const;

//! Start monitoring fine tuning hyperparameters.
void startProgressMonitoringFineTuneHyperparameters();

//! Start monitoring the final model training.
void startProgressMonitoringFinalTrain();

//! Restore \p loss function pointer from the \p traverser.
static bool restoreLoss(TLossFunctionUPtr& loss, core::CStateRestoreTraverser& traverser);

Expand Down
36 changes: 27 additions & 9 deletions include/maths/CDataFrameAnalysisInstrumentationInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,19 @@ class MATHS_EXPORT CDataFrameAnalysisInstrumentationInterface {
public:
using TProgressCallback = std::function<void(double)>;
using TMemoryUsageCallback = std::function<void(std::int64_t)>;
using TStepCallback = std::function<void(const std::string&)>;
using TFlushCallback = std::function<void(const std::string&)>;

public:
virtual ~CDataFrameAnalysisInstrumentationInterface() = default;

//! Adds \p delta to the memory usage statistics.
virtual void updateMemoryUsage(std::int64_t delta) = 0;

//! Start progress monitoring for \p phase.
//!
//! \note This resets the current progress to zero.
virtual void startNewProgressMonitoredTask(const std::string& task) = 0;

//! This adds \p fractionalProgress to the current progress.
//!
//! \note The caller should try to ensure that the sum of the values added
Expand All @@ -40,25 +47,34 @@ class MATHS_EXPORT CDataFrameAnalysisInstrumentationInterface {
//! than 0.001. In fact, it is unlikely that such high resolution is needed
//! and typically this would be called significantly less frequently.
virtual void updateProgress(double fractionalProgress) = 0;
//! Trigger the next step of the job. This will initiate writing the job state
//! to the results pipe.
virtual void nextStep(const std::string& phase = "") = 0;

//! Flush then reinitialize the instrumentation data. This will trigger
//! writing them to the results pipe.
virtual void flush(const std::string& tag = "") = 0;

//! Factory for the updateProgress() callback function object.
TProgressCallback progressCallback() {
return [this](double fractionalProgress) {
this->updateProgress(fractionalProgress);
};
}

//! Factory for the updateMemoryUsage() callback function object.
TMemoryUsageCallback memoryUsageCallback() {
return [this](std::int64_t delta) { this->updateMemoryUsage(delta); };
}
//! Factory for the nextStep() callback function object.
TStepCallback stepCallback() {
return [this](const std::string& phase) { this->nextStep(phase); };

//! Factory for the flush() callback function object.
TFlushCallback flushCallback() {
return [this](const std::string& tag) { this->flush(tag); };
}
};

//! \brief Instrumentation interface for Outlier Detection jobs.
//!
//! DESCRIPTION:\n
//! This interface extends CDataFrameAnalysisInstrumentationInterface with a setters
//! for analysis parameters and elapsed time.
class MATHS_EXPORT CDataFrameOutliersInstrumentationInterface
: virtual public CDataFrameAnalysisInstrumentationInterface {
public:
Expand Down Expand Up @@ -130,8 +146,9 @@ class MATHS_EXPORT CDataFrameOutliersInstrumentationStub
: public CDataFrameOutliersInstrumentationInterface {
public:
void updateMemoryUsage(std::int64_t) override {}
void startNewProgressMonitoredTask(const std::string& /* task */) override {}
void updateProgress(double) override {}
void nextStep(const std::string& /* phase */) override {}
void flush(const std::string& /* tag */) override {}
void parameters(const maths::COutliers::SComputeParameters& /* parameters */) override {}
void elapsedTime(std::uint64_t /* time */) override {}
void featureInfluenceThreshold(double /* featureInfluenceThreshold */) override {}
Expand All @@ -142,8 +159,9 @@ class MATHS_EXPORT CDataFrameTrainBoostedTreeInstrumentationStub
: public CDataFrameTrainBoostedTreeInstrumentationInterface {
public:
void updateMemoryUsage(std::int64_t) override {}
void startNewProgressMonitoredTask(const std::string& /* task */) override {}
void updateProgress(double) override {}
void nextStep(const std::string& /* phase */) override {}
void flush(const std::string& /* tag */) override {}
void type(EStatsType /* type */) override {}
void iteration(std::size_t /* iteration */) override {}
void iterationTime(std::uint64_t /* delta */) override {}
Expand Down
5 changes: 5 additions & 0 deletions include/maths/CDataFrameCategoryEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ class MATHS_EXPORT CMakeDataFrameCategoryEncoder {
using TSizeVec = std::vector<std::size_t>;
using TOptionalDouble = boost::optional<double>;
using TEncodingUPtrVec = CDataFrameCategoryEncoder::TEncodingUPtrVec;
using TProgressCallback = std::function<void(double)>;

public:
//! The minimum number of training rows needed per feature used.
Expand Down Expand Up @@ -322,6 +323,9 @@ class MATHS_EXPORT CMakeDataFrameCategoryEncoder {
//! Set a mask of the columns to include.
CMakeDataFrameCategoryEncoder& columnMask(TSizeVec columnMask);

//! Set a callback to monitor progress.
CMakeDataFrameCategoryEncoder& progressCallback(TProgressCallback callback);

//! Make the encoding.
virtual TEncodingUPtrVec makeEncodings();

Expand Down Expand Up @@ -386,6 +390,7 @@ class MATHS_EXPORT CMakeDataFrameCategoryEncoder {
TDoubleVec m_EncodedColumnMics;
TSizeVec m_EncodedColumnInputColumnMap;
TSizeVec m_EncodedColumnEncodingMap;
TProgressCallback m_RecordProgress;
};
}
}
Expand Down
6 changes: 6 additions & 0 deletions include/maths/COutliers.h
Original file line number Diff line number Diff line change
Expand Up @@ -659,11 +659,17 @@ class MATHS_EXPORT COutliers : private core::CNonInstantiatable {
template<typename POINT>
using TAnnotatedPoint = CAnnotatedVector<POINT, std::size_t>;

//! \name Method Names
//@{
static const std::string LOF;
static const std::string LDOF;
static const std::string DISTANCE_KNN;
static const std::string TOTAL_DISTANCE_KNN;
static const std::string ENSEMBLE;
//@}

//! Instrumentation phase.
static const std::string COMPUTING_OUTLIERS;

//! The outlier detection methods which are available.
enum EMethod {
Expand Down
Loading