diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java index 8da0209e10293..00f03c5247162 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java @@ -77,6 +77,7 @@ public class Job implements SimpleDiffable, Writeable, ToXContentObject { public static final ParseField ANALYSIS_LIMITS = new ParseField("analysis_limits"); public static final ParseField CREATE_TIME = new ParseField("create_time"); public static final ParseField CUSTOM_SETTINGS = new ParseField("custom_settings"); + public static final ParseField KEEP_JOB_DATA = new ParseField("keep_job_data"); public static final ParseField DATA_DESCRIPTION = new ParseField("data_description"); public static final ParseField DESCRIPTION = new ParseField("description"); public static final ParseField FINISHED_TIME = new ParseField("finished_time"); @@ -478,6 +479,18 @@ public Map getCustomSettings() { return customSettings; } + public Boolean keepJobData() { + if (customSettings != null && customSettings.containsKey(KEEP_JOB_DATA.getPreferredName())) { + Object value = customSettings.get(KEEP_JOB_DATA.getPreferredName()); + if (value instanceof Boolean) { + return (Boolean) value; + } else if (value instanceof String) { + return "true".equalsIgnoreCase((String) value); + } + } + return false; + } + public String getModelSnapshotId() { return modelSnapshotId; } diff --git a/x-pack/plugin/ml/build.gradle b/x-pack/plugin/ml/build.gradle index 706d7ea73aea9..03036891845bc 100644 --- a/x-pack/plugin/ml/build.gradle +++ b/x-pack/plugin/ml/build.gradle @@ -57,7 +57,8 @@ esplugin.bundleSpec.exclude 'platform/licenses/**' } dependencies { - testImplementation project(path: ':x-pack:plugin:inference') + implementation project(path: ':libs:elasticsearch-logging') + testImplementation project(path: ':x-pack:plugin:inference') compileOnly project(':modules:lang-painless:spi') compileOnly project(path: xpackModule('core')) compileOnly project(path: xpackModule('autoscaling')) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java index 2d4ea308a6693..70fad6a05e277 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java @@ -159,6 +159,12 @@ public void build() throws IOException, InterruptedException { buildQuantiles(command); processPipes.addArgs(command); + + // log the command + if (job.keepJobData()) { + // log the command to reproduce the autodetect run + logger.info("Autodetect process command: " + command); + } controller.startProcess(command); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index c25cac48b27d5..223ae532ff9db 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -25,10 +25,12 @@ import org.elasticsearch.xpack.ml.process.NativeController; import org.elasticsearch.xpack.ml.process.ProcessPipes; import org.elasticsearch.xpack.ml.process.ProcessResultsParser; +import org.elasticsearch.xpack.ml.utils.FileUtils; import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; @@ -110,11 +112,19 @@ public AutodetectProcess createAutodetectProcess( nativeController, processPipes, numberOfFields, - filesToDelete, + (job.keepJobData() == false) ? filesToDelete : new ArrayList<>(), resultsParser, onProcessCrash ); + try { + // check if jobs'custom settings contain the setting 'keep_job_data' + // and if it is set to true, then we create the autodetect controll message file + if (job.keepJobData()) { + FileUtils.recreateTempDirectoryIfNeeded(env.tmpFile()); + Path controlMsgFile = Files.createTempFile(env.tmpFile(), "autodetect_control_msg", ".json"); + autodetect.setControlMessageFilePath(controlMsgFile); + } autodetect.start(executorService, stateProcessor); return autodetect; } catch (IOException | EsRejectedExecutionException e) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java index dd71800bd4f90..da57317b0f0db 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java @@ -61,6 +61,7 @@ public abstract class AbstractNativeProcess implements NativeProcess { private volatile boolean processCloseInitiated; private volatile boolean processKilled; private volatile boolean isReady; + private volatile Path controlMessageFilePath; protected AbstractNativeProcess( String jobId, @@ -107,7 +108,7 @@ public void start(ExecutorService executorService) throws IOException { processPipes.connectOtherStreams(); if (processPipes.getProcessInStream().isPresent()) { processInStream.set(new BufferedOutputStream(processPipes.getProcessInStream().get())); - this.recordWriter.set(new LengthEncodedWriter(processInStream.get())); + this.recordWriter.set(new LengthEncodedWriter(processInStream.get(), getControlMessageFilePath())); } processOutStream.set(processPipes.getProcessOutStream().orElse(null)); processRestoreStream.set(processPipes.getRestoreStream().orElse(null)); @@ -348,4 +349,14 @@ public void consumeAndCloseOutputStream() { // Given we are closing down the process there is no point propagating IO exceptions here } } + + @Nullable + public Path getControlMessageFilePath() { + return controlMessageFilePath; + } + + public void setControlMessageFilePath(Path controlMessageFilePath) { + this.controlMessageFilePath = controlMessageFilePath; + } + } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java index 82a44c4f3b075..8b115288ac24b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java @@ -6,12 +6,16 @@ */ package org.elasticsearch.xpack.ml.process.writer; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.xpack.core.ml.process.writer.RecordWriter; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.List; /** @@ -27,18 +31,49 @@ */ public class LengthEncodedWriter implements RecordWriter { private OutputStream outputStream; + // In case customer setting "keep_job_data" is set to true, we will write the data to a file + // additionally to the output stream. + private OutputStream fileOutputStream; private ByteBuffer lengthBuffer; + private Logger logger = LogManager.getLogger(LengthEncodedWriter.class); + /** * Create the writer on the OutputStream os. * This object will never close os. */ public LengthEncodedWriter(OutputStream os) { + this(os, null); + } + + public LengthEncodedWriter(OutputStream os, Path filePath) { outputStream = os; + try { + if (filePath != null) { + logger.info("Opening file: " + filePath + " for writing."); + fileOutputStream = Files.newOutputStream(filePath); + } else { + fileOutputStream = null; + } + } catch (IOException e) { + logger.error("Failed to open file: " + filePath + " for writing.", e.getMessage(), e); + fileOutputStream = null; + } // This will be used to convert 32 bit integers to network byte order lengthBuffer = ByteBuffer.allocate(4); // 4 == sizeof(int) } + // Add public destructor + public void close() { + if (fileOutputStream != null) { + try { + fileOutputStream.close(); + } catch (IOException e) { + logger.error("Failed to close file output stream.", e.getMessage(), e); + } + } + } + /** * Convert each String in the record array to a length/value encoded pair * and write to the outputstream. @@ -75,6 +110,9 @@ public void writeNumFields(int numFields) throws IOException { lengthBuffer.clear(); lengthBuffer.putInt(numFields); outputStream.write(lengthBuffer.array()); + if (fileOutputStream != null) { + fileOutputStream.write(lengthBuffer.array()); + } } /** @@ -87,10 +125,17 @@ public void writeField(String field) throws IOException { lengthBuffer.putInt(utf8Bytes.length); outputStream.write(lengthBuffer.array()); outputStream.write(utf8Bytes); + if (fileOutputStream != null) { + fileOutputStream.write(lengthBuffer.array()); + fileOutputStream.write(utf8Bytes); + } } @Override public void flush() throws IOException { outputStream.flush(); + if (fileOutputStream != null) { + fileOutputStream.flush(); + } } }