Skip to content

Commit

Permalink
[6.7][ML] Improve autodetect logic for persistence (elastic#437)
Browse files Browse the repository at this point in the history
Changed the logic surrounding persistence of both state and quantiles on
graceful shutdown so that persistence only occurs if and only if at
least one input record has been processed or time has been advanced.

closes elastic#393
  • Loading branch information
edsavage committed Mar 18, 2019
1 parent 9d03ecd commit 5b493da
Show file tree
Hide file tree
Showing 17 changed files with 183 additions and 5 deletions.
2 changes: 1 addition & 1 deletion docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Adjust seccomp filter for Fedora 29. {ml-pull}354[#354]
=== Bug Fixes
=== Regressions
* Improve autodetect logic for persistence. {ml-pull}437[#437]
== {es} version 6.6.2
Expand Down
6 changes: 6 additions & 0 deletions include/api/CAnomalyJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
//! How many records did we handle?
virtual uint64_t numRecordsHandled() const;

//! Is persistence needed?
virtual bool isPersistenceNeeded(const std::string& description) const;

//! Log a list of the detectors and keys
void description() const;

Expand Down Expand Up @@ -454,6 +457,9 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
//! The hierarchical results normalizer.
model::CHierarchicalResultsNormalizer m_Normalizer;

//! Flag indicating whether or not time has been advanced.
bool m_TimeAdvanced{false};

friend class ::CBackgroundPersisterTest;
friend class ::CAnomalyJobTest;
};
Expand Down
3 changes: 3 additions & 0 deletions include/api/CDataProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ class API_EXPORT CDataProcessor : private core::CNonCopyable {
//! Access the output handler
virtual COutputHandler& outputHandler() = 0;

//! Is persistence needed?
virtual bool isPersistenceNeeded(const std::string& description) const = 0;

//! Create debug for a record. This is expensive so should NOT be
//! called for every record as a matter of course.
static std::string debugPrintRecord(const TStrStrUMap& dataRowFields);
Expand Down
3 changes: 3 additions & 0 deletions include/api/CFieldDataTyper.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ class API_EXPORT CFieldDataTyper : public CDataProcessor {
virtual bool restoreState(core::CDataSearcher& restoreSearcher,
core_t::TTime& completeToTime);

//! Is persistence needed?
virtual bool isPersistenceNeeded(const std::string& description) const;

//! Persist current state
virtual bool persistState(core::CDataAdder& persister);

Expand Down
3 changes: 3 additions & 0 deletions include/api/COutputChainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ class API_EXPORT COutputChainer : public COutputHandler {
//! Persist current state due to the periodic persistence being triggered.
virtual bool periodicPersistState(CBackgroundPersister& persister);

//! Is persistence needed?
virtual bool isPersistenceNeeded(const std::string& description) const;

//! The chainer does consume control messages, because it passes them on
//! to whatever processor it's chained to.
virtual bool consumesControlMessages();
Expand Down
3 changes: 3 additions & 0 deletions include/api/COutputHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ class API_EXPORT COutputHandler : private core::CNonCopyable {
//! Persist current state due to the periodic persistence being triggered.
virtual bool periodicPersistState(CBackgroundPersister& persister);

//! Is persistence needed?
virtual bool isPersistenceNeeded(const std::string& description) const;

//! Does this handler deal with control messages?
virtual bool consumesControlMessages();

Expand Down
3 changes: 3 additions & 0 deletions include/config/CAutoconfigurer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ class CONFIG_EXPORT CAutoconfigurer : public api::CDataProcessor {
//! Generate the report.
virtual void finalise();

//! Is persistence needed?
virtual bool isPersistenceNeeded(const std::string& description) const;

//! No-op.
virtual bool restoreState(core::CDataSearcher& restoreSearcher,
core_t::TTime& completeToTime);
Expand Down
18 changes: 16 additions & 2 deletions lib/api/CAnomalyJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,10 @@ bool CAnomalyJob::handleRecord(const TStrStrUMap& dataRowFields) {
}

void CAnomalyJob::finalise() {
// Persist final state of normalizer
m_JsonOutputWriter.persistNormalizer(m_Normalizer, m_LastNormalizerPersistTime);
// Persist final state of normalizer iff an input record has been handled or time has been advanced.
if (this->isPersistenceNeeded("quantiles state")) {
m_JsonOutputWriter.persistNormalizer(m_Normalizer, m_LastNormalizerPersistTime);
}

// Prune the models so that the final persisted state is as neat as possible
this->pruneAllModels();
Expand Down Expand Up @@ -392,11 +394,23 @@ void CAnomalyJob::advanceTime(const std::string& time_) {
LOG_TRACE(<< "Received request to advance time to " << time);
}

m_TimeAdvanced = true;

this->outputBucketResultsUntil(time);

this->timeNow(time);
}

bool CAnomalyJob::isPersistenceNeeded(const std::string& description) const {
if ((m_NumRecordsHandled == 0) && (m_TimeAdvanced == false)) {
LOG_DEBUG(<< "Will not attempt to persist " << description
<< ". Zero records were handled and time has not been advanced.");
return false;
}

return true;
}

void CAnomalyJob::outputBucketResultsUntil(core_t::TTime time) {
// If the bucket time has increased, output results for all field names
core_t::TTime bucketLength = m_ModelConfig.bucketLength();
Expand Down
3 changes: 1 addition & 2 deletions lib/api/CCmdSkeleton.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ bool CCmdSkeleton::persistState() {
return true;
}

if (m_Processor.numRecordsHandled() == 0) {
LOG_DEBUG(<< "Zero records were handled - will not attempt to persist state");
if (m_Processor.isPersistenceNeeded("state") == false) {
return true;
}

Expand Down
14 changes: 14 additions & 0 deletions lib/api/CFieldDataTyper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,20 @@ bool CFieldDataTyper::persistState(core::CDataAdder& persister) {
return this->doPersistState(m_DataTyper->makePersistFunc(), m_ExamplesCollector, persister);
}

bool CFieldDataTyper::isPersistenceNeeded(const std::string& description) const {
// Pass on the request in case we're chained
if (m_OutputHandler.isPersistenceNeeded(description)) {
return true;
}

if (m_NumRecordsHandled == 0) {
LOG_DEBUG(<< "Zero records were handled - will not attempt to persist "
<< description << ".");
return false;
}
return true;
}

bool CFieldDataTyper::doPersistState(const CDataTyper::TPersistFunc& dataTyperPersistFunc,
const CCategoryExamplesCollector& examplesCollector,
core::CDataAdder& persister) {
Expand Down
4 changes: 4 additions & 0 deletions lib/api/COutputChainer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ bool COutputChainer::periodicPersistState(CBackgroundPersister& persister) {
return m_DataProcessor.periodicPersistState(persister);
}

bool COutputChainer::isPersistenceNeeded(const std::string& description) const {
return m_DataProcessor.isPersistenceNeeded(description);
}

bool COutputChainer::consumesControlMessages() {
return true;
}
Expand Down
5 changes: 5 additions & 0 deletions lib/api/COutputHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ bool COutputHandler::periodicPersistState(CBackgroundPersister& /* persister */)
return true;
}

bool COutputHandler::isPersistenceNeeded(const std::string& /*description*/) const {
// NOOP unless overridden
return false;
}

COutputHandler::CPreComputedHash::CPreComputedHash(size_t hash) : m_Hash(hash) {
}

Expand Down
105 changes: 105 additions & 0 deletions lib/api/unittest/CAnomalyJobTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,109 @@ void CAnomalyJobTest::testSkipTimeControlMessage() {
CPPUNIT_ASSERT_EQUAL(std::size_t(11), countBuckets("bucket", outputStrm.str() + "]"));
}

void CAnomalyJobTest::testIsPersistenceNeeded() {

model::CLimits limits;
api::CFieldConfig fieldConfig;
api::CFieldConfig::TStrVec clauses;
clauses.push_back("count");
fieldConfig.initFromClause(clauses);
model::CAnomalyDetectorModelConfig modelConfig =
model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_SIZE);

{
// check that persistence is not needed if no input records have been handled
// and the time has not been advanced

std::stringstream outputStrm;
core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm);

api::CAnomalyJob job("job", limits, fieldConfig, modelConfig, wrappedOutputStream);

CPPUNIT_ASSERT_EQUAL(false, job.isPersistenceNeeded("test state"));

job.finalise();
wrappedOutputStream.syncFlush();

std::string output = outputStrm.str();
LOG_DEBUG(<< "Output has yielded: " << output);

// check that no quantile state was persisted
core::CRegex regex;
regex.init("\n");
core::CRegex::TStrVec lines;
regex.split(output, lines);
CPPUNIT_ASSERT_EQUAL(false, findLine("\"quantiles\":{\"job_id\":\"job\",\"quantile_state\".*",
lines));
}

core_t::TTime time = 3600;
{
// check that persistence is needed if an input record has been handled

std::stringstream outputStrm;
core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm);

api::CAnomalyJob job("job", limits, fieldConfig, modelConfig, wrappedOutputStream);

api::CAnomalyJob::TStrStrUMap dataRows;

std::ostringstream ss;
ss << time;
dataRows["time"] = ss.str();
CPPUNIT_ASSERT(job.handleRecord(dataRows));

CPPUNIT_ASSERT_EQUAL(true, job.isPersistenceNeeded("test state"));

job.finalise();
wrappedOutputStream.syncFlush();

std::string output = outputStrm.str();
LOG_DEBUG(<< "Output has yielded: " << output);

// check that the quantile state has actually been persisted
core::CRegex regex;
regex.init("\n");
core::CRegex::TStrVec lines;
regex.split(output, lines);
CPPUNIT_ASSERT_EQUAL(true, findLine("\"quantiles\":{\"job_id\":\"job\",\"quantile_state\".*",
lines));
}

{
// check that persistence is needed if time has been advanced (via a control message)
// even if no input data has been handled

std::stringstream outputStrm;
core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm);

api::CAnomalyJob job("job", limits, fieldConfig, modelConfig, wrappedOutputStream);

api::CAnomalyJob::TStrStrUMap dataRows;

time = 39600;
dataRows["."] = "t39600";
CPPUNIT_ASSERT(job.handleRecord(dataRows));
CPPUNIT_ASSERT(job.isPersistenceNeeded("test state"));

CPPUNIT_ASSERT_EQUAL(true, job.isPersistenceNeeded("test state"));

job.finalise();
wrappedOutputStream.syncFlush();

std::string output = outputStrm.str();
LOG_DEBUG(<< "Output has yielded: " << output);

// check that the quantile state has actually been persisted
core::CRegex regex;
regex.init("\n");
core::CRegex::TStrVec lines;
regex.split(output, lines);
CPPUNIT_ASSERT_EQUAL(true, findLine("\"quantiles\":{\"job_id\":\"job\",\"quantile_state\".*",
lines));
}
}

void CAnomalyJobTest::testModelPlot() {
core_t::TTime bucketSize = 10000;
model::CLimits limits;
Expand Down Expand Up @@ -651,6 +754,8 @@ CppUnit::Test* CAnomalyJobTest::suite() {
suiteOfTests->addTest(new CppUnit::TestCaller<CAnomalyJobTest>(
"CAnomalyJobTest::testSkipTimeControlMessage",
&CAnomalyJobTest::testSkipTimeControlMessage));
suiteOfTests->addTest(new CppUnit::TestCaller<CAnomalyJobTest>(
"CAnomalyJobTest::testIsPersistenceNeeded", &CAnomalyJobTest::testIsPersistenceNeeded));
suiteOfTests->addTest(new CppUnit::TestCaller<CAnomalyJobTest>(
"CAnomalyJobTest::testModelPlot", &CAnomalyJobTest::testModelPlot));
suiteOfTests->addTest(new CppUnit::TestCaller<CAnomalyJobTest>(
Expand Down
1 change: 1 addition & 0 deletions lib/api/unittest/CAnomalyJobTest.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class CAnomalyJobTest : public CppUnit::TestFixture {
void testOutOfSequence();
void testControlMessages();
void testSkipTimeControlMessage();
void testIsPersistenceNeeded();
void testModelPlot();
void testInterimResultEdgeCases();
void testRestoreFailsWithEmptyStream();
Expand Down
9 changes: 9 additions & 0 deletions lib/api/unittest/CMockDataProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ bool CMockDataProcessor::handleRecord(const TStrStrUMap& dataRowFields) {
void CMockDataProcessor::finalise() {
}

bool CMockDataProcessor::isPersistenceNeeded(const std::string& description) const {
if (m_NumRecordsHandled == 0) {
LOG_DEBUG(<< "Zero records were handled - will not attempt to persist "
<< description << ".");
return false;
}
return true;
}

bool CMockDataProcessor::restoreState(ml::core::CDataSearcher& restoreSearcher,
ml::core_t::TTime& completeToTime) {
// Pass on the request in case we're chained
Expand Down
2 changes: 2 additions & 0 deletions lib/api/unittest/CMockDataProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class CMockDataProcessor : public ml::api::CDataProcessor {

virtual void finalise();

virtual bool isPersistenceNeeded(const std::string& description) const;

//! Restore previously saved state
virtual bool restoreState(ml::core::CDataSearcher& restoreSearcher,
ml::core_t::TTime& completeToTime);
Expand Down
4 changes: 4 additions & 0 deletions lib/config/CAutoconfigurer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ void CAutoconfigurer::finalise() {
m_Impl->finalise();
}

bool CAutoconfigurer::isPersistenceNeeded(const std::string& /*description*/) const {
return false;
}

bool CAutoconfigurer::restoreState(core::CDataSearcher& /*restoreSearcher*/,
core_t::TTime& /*completeToTime*/) {
return true;
Expand Down

0 comments on commit 5b493da

Please sign in to comment.