Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Store anomaly detection config file and input on demand #110582

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class Job implements SimpleDiffable<Job>, Writeable, ToXContentObject {
public static final ParseField ANALYSIS_LIMITS = new ParseField("analysis_limits");
public static final ParseField CREATE_TIME = new ParseField("create_time");
public static final ParseField CUSTOM_SETTINGS = new ParseField("custom_settings");
public static final ParseField KEEP_JOB_DATA = new ParseField("keep_job_data");
public static final ParseField DATA_DESCRIPTION = new ParseField("data_description");
public static final ParseField DESCRIPTION = new ParseField("description");
public static final ParseField FINISHED_TIME = new ParseField("finished_time");
Expand Down Expand Up @@ -478,6 +479,18 @@ public Map<String, Object> getCustomSettings() {
return customSettings;
}

public Boolean keepJobData() {
if (customSettings != null && customSettings.containsKey(KEEP_JOB_DATA.getPreferredName())) {
Object value = customSettings.get(KEEP_JOB_DATA.getPreferredName());
if (value instanceof Boolean) {
return (Boolean) value;
} else if (value instanceof String) {
return "true".equalsIgnoreCase((String) value);
}
}
return false;
}

public String getModelSnapshotId() {
return modelSnapshotId;
}
Expand Down
3 changes: 2 additions & 1 deletion x-pack/plugin/ml/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ esplugin.bundleSpec.exclude 'platform/licenses/**'
}

dependencies {
testImplementation project(path: ':x-pack:plugin:inference')
implementation project(path: ':libs:elasticsearch-logging')
testImplementation project(path: ':x-pack:plugin:inference')
compileOnly project(':modules:lang-painless:spi')
compileOnly project(path: xpackModule('core'))
compileOnly project(path: xpackModule('autoscaling'))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ public void build() throws IOException, InterruptedException {
buildQuantiles(command);

processPipes.addArgs(command);

// log the command
if (job.keepJobData()) {
// log the command to reproduce the autodetect run
logger.info("Autodetect process command: " + command);
}
controller.startProcess(command);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.process.ProcessPipes;
import org.elasticsearch.xpack.ml.process.ProcessResultsParser;
import org.elasticsearch.xpack.ml.utils.FileUtils;
import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -110,11 +112,19 @@ public AutodetectProcess createAutodetectProcess(
nativeController,
processPipes,
numberOfFields,
filesToDelete,
(job.keepJobData() == false) ? filesToDelete : new ArrayList<>(),
resultsParser,
onProcessCrash
);

try {
// check if jobs'custom settings contain the setting 'keep_job_data'
// and if it is set to true, then we create the autodetect controll message file
if (job.keepJobData()) {
FileUtils.recreateTempDirectoryIfNeeded(env.tmpFile());
Path controlMsgFile = Files.createTempFile(env.tmpFile(), "autodetect_control_msg", ".json");
autodetect.setControlMessageFilePath(controlMsgFile);
}
autodetect.start(executorService, stateProcessor);
return autodetect;
} catch (IOException | EsRejectedExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public abstract class AbstractNativeProcess implements NativeProcess {
private volatile boolean processCloseInitiated;
private volatile boolean processKilled;
private volatile boolean isReady;
private volatile Path controlMessageFilePath;

protected AbstractNativeProcess(
String jobId,
Expand Down Expand Up @@ -107,7 +108,7 @@ public void start(ExecutorService executorService) throws IOException {
processPipes.connectOtherStreams();
if (processPipes.getProcessInStream().isPresent()) {
processInStream.set(new BufferedOutputStream(processPipes.getProcessInStream().get()));
this.recordWriter.set(new LengthEncodedWriter(processInStream.get()));
this.recordWriter.set(new LengthEncodedWriter(processInStream.get(), getControlMessageFilePath()));
}
processOutStream.set(processPipes.getProcessOutStream().orElse(null));
processRestoreStream.set(processPipes.getRestoreStream().orElse(null));
Expand Down Expand Up @@ -348,4 +349,14 @@ public void consumeAndCloseOutputStream() {
// Given we are closing down the process there is no point propagating IO exceptions here
}
}

@Nullable
public Path getControlMessageFilePath() {
return controlMessageFilePath;
}

public void setControlMessageFilePath(Path controlMessageFilePath) {
this.controlMessageFilePath = controlMessageFilePath;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@
*/
package org.elasticsearch.xpack.ml.process.writer;

import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.xpack.core.ml.process.writer.RecordWriter;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;

/**
Expand All @@ -27,18 +31,49 @@
*/
public class LengthEncodedWriter implements RecordWriter {
private OutputStream outputStream;
// In case customer setting "keep_job_data" is set to true, we will write the data to a file
// additionally to the output stream.
private OutputStream fileOutputStream;
private ByteBuffer lengthBuffer;

private Logger logger = LogManager.getLogger(LengthEncodedWriter.class);

/**
* Create the writer on the OutputStream <code>os</code>.
* This object will never close <code>os</code>.
*/
public LengthEncodedWriter(OutputStream os) {
this(os, null);
}

public LengthEncodedWriter(OutputStream os, Path filePath) {
outputStream = os;
try {
if (filePath != null) {
logger.info("Opening file: " + filePath + " for writing.");
fileOutputStream = Files.newOutputStream(filePath);
} else {
fileOutputStream = null;
}
} catch (IOException e) {
logger.error("Failed to open file: " + filePath + " for writing.", e.getMessage(), e);
fileOutputStream = null;
}
// This will be used to convert 32 bit integers to network byte order
lengthBuffer = ByteBuffer.allocate(4); // 4 == sizeof(int)
}

// Add public destructor
public void close() {
if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
logger.error("Failed to close file output stream.", e.getMessage(), e);
}
}
}

/**
* Convert each String in the record array to a length/value encoded pair
* and write to the outputstream.
Expand Down Expand Up @@ -75,6 +110,9 @@ public void writeNumFields(int numFields) throws IOException {
lengthBuffer.clear();
lengthBuffer.putInt(numFields);
outputStream.write(lengthBuffer.array());
if (fileOutputStream != null) {
fileOutputStream.write(lengthBuffer.array());
}
}

/**
Expand All @@ -87,10 +125,17 @@ public void writeField(String field) throws IOException {
lengthBuffer.putInt(utf8Bytes.length);
outputStream.write(lengthBuffer.array());
outputStream.write(utf8Bytes);
if (fileOutputStream != null) {
fileOutputStream.write(lengthBuffer.array());
fileOutputStream.write(utf8Bytes);
}
}

@Override
public void flush() throws IOException {
outputStream.flush();
if (fileOutputStream != null) {
fileOutputStream.flush();
}
}
}