Skip to content

Commit

Permalink
[ML] Fix job ID in C++ logs for normalize and memory estimation
Browse files Browse the repository at this point in the history
The changes of elastic#54636 and elastic#60395 were incorrect in their assertion
that "the job ID passed to the process pipes is only used to make
the file names unique".  In fact it is also passed to the C++ log
handler and gets logged with every message logged by the C++
processes.

This PR splits the job ID and unique IDs (added in elastic#54636 and elastic#60395)
so that the correct job ID is passed to the log handler.

Nothing really bad happened as a result of this problem - it was
just cosmetic.
  • Loading branch information
droberts195 committed Oct 19, 2020
1 parent 70d88ef commit 7cbbcbf
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public NativeAnalyticsProcess createAnalyticsProcess(DataFrameAnalyticsConfig co
String jobId = config.getId();
List<Path> filesToDelete = new ArrayList<>();
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, AnalyticsBuilder.ANALYTICS, jobId,
false, true, true, hasState, config.getAnalysis().persistsState());
null, false, true, true, hasState, config.getAnalysis().persistsState());

// The extra 2 are for the checksum and the control field
int numberOfFields = analyticsProcessConfig.cols() + 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,11 @@ public NativeMemoryUsageEstimationProcess createAnalyticsProcess(
ExecutorService executorService,
Consumer<String> onProcessCrash) {
List<Path> filesToDelete = new ArrayList<>();
// The config ID passed to the process pipes is only used to make the file names unique. Since memory estimation can be
// called many times in quick succession for the same config the config ID alone is not sufficient to guarantee that the
// memory estimation process pipe names are unique. Therefore an increasing counter value is appended to the config ID
// to ensure uniqueness between calls.
// Since memory estimation can be called many times in quick succession for the same config the config ID alone is not
// sufficient to guarantee that the memory estimation process pipe names are unique. Therefore an increasing counter
// value is passed as well as the config ID to ensure uniqueness between calls.
ProcessPipes processPipes = new ProcessPipes(
env, NAMED_PIPE_HELPER, processConnectTimeout, AnalyticsBuilder.ANALYTICS, config.getId() + "_" + counter.incrementAndGet(),
env, NAMED_PIPE_HELPER, processConnectTimeout, AnalyticsBuilder.ANALYTICS, config.getId(), counter.incrementAndGet(),
false, false, true, false, false);

createNativeProcess(config.getId(), analyticsProcessConfig, filesToDelete, processPipes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public AutodetectProcess createAutodetectProcess(Job job,
Consumer<String> onProcessCrash) {
List<Path> filesToDelete = new ArrayList<>();
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, AutodetectBuilder.AUTODETECT,
job.getId(), false, true, true, params.modelSnapshot() != null,
job.getId(), null, false, true, true, params.modelSnapshot() != null,
AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING.get(settings) == false);
createNativeProcess(job, params, processPipes, filesToDelete);
boolean includeTokensField = MachineLearning.CATEGORIZATION_TOKENIZATION_IN_JAVA
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ void setProcessConnectTimeout(TimeValue processConnectTimeout) {
@Override
public NormalizerProcess createNormalizerProcess(String jobId, String quantilesState, Integer bucketSpan,
ExecutorService executorService) {
// The job ID passed to the process pipes is only used to make the file names unique. Since normalize can get run many times
// in quick succession for the same job the job ID alone is not sufficient to guarantee that the normalizer process pipe names
// are unique. Therefore an increasing counter value is appended to the job ID to ensure uniqueness between calls.
// Since normalize can get run many times in quick succession for the same job the job ID alone is not sufficient to
// guarantee that the normalizer process pipe names are unique. Therefore an increasing counter value is passed as
// well as the job ID to ensure uniqueness between calls.
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, NormalizerBuilder.NORMALIZE,
jobId + "_" + counter.incrementAndGet(), false, true, true, false, false);
jobId, counter.incrementAndGet(), false, true, true, false, false);
createNativeProcess(jobId, quantilesState, processPipes, bucketSpan);

NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, nativeController, processPipes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static NativeController makeNativeController(String localNodeName, Enviro
NativeController(String localNodeName, Environment env, NamedPipeHelper namedPipeHelper, NamedXContentRegistry xContentRegistry)
throws IOException {
this.localNodeName = localNodeName;
ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, CONTROLLER_CONNECT_TIMEOUT, CONTROLLER, null,
ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, CONTROLLER_CONNECT_TIMEOUT, CONTROLLER, null, null,
true, false, true, false, false);
processPipes.connectLogStream();
this.cppLogHandler = processPipes.getLogStreamHandler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class ProcessPipes {
* May be null or empty for processes not associated with a specific job.
*/
public ProcessPipes(Environment env, NamedPipeHelper namedPipeHelper, Duration timeout, String processName, String jobId,
boolean wantCommandPipe, boolean wantProcessInPipe, boolean wantProcessOutPipe,
Long uniqueId, boolean wantCommandPipe, boolean wantProcessInPipe, boolean wantProcessOutPipe,
boolean wantRestorePipe, boolean wantPersistPipe) {
this.namedPipeHelper = namedPipeHelper;
this.jobId = jobId;
Expand All @@ -91,6 +91,9 @@ public ProcessPipes(Environment env, NamedPipeHelper namedPipeHelper, Duration t
if (!Strings.isNullOrEmpty(jobId)) {
prefixBuilder.append(jobId).append('_');
}
if (uniqueId != null) {
prefixBuilder.append(uniqueId).append('_');
}
String prefix = prefixBuilder.toString();
String suffix = String.format(Locale.ROOT, "_%d", JvmInfo.jvmInfo().getPid());
logPipeName = String.format(Locale.ROOT, "%slog%s", prefix, suffix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testProcessPipes() throws Exception {

int timeoutSeconds = randomIntBetween(5, 100);
ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(timeoutSeconds), AutodetectBuilder.AUTODETECT,
"my_job", false, true, true, true, true);
"my_job", null, false, true, true, true, true);

List<String> command = new ArrayList<>();
processPipes.addArgs(command);
Expand Down Expand Up @@ -110,7 +110,7 @@ public void testCloseUnusedPipes_notConnected() {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
Environment env = TestEnvironment.newEnvironment(settings);

new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(2), AutodetectBuilder.AUTODETECT, "my_job",
new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(2), AutodetectBuilder.AUTODETECT, "my_job", null,
true, true, true, true, true);
}

Expand Down Expand Up @@ -138,7 +138,7 @@ public void testCloseOpenedPipesOnError() throws IOException {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
Environment env = TestEnvironment.newEnvironment(settings);
ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(2), AutodetectBuilder.AUTODETECT, "my_job",
true, true, true, true, true);
null, true, true, true, true, true);

processPipes.connectLogStream();
expectThrows(IOException.class, processPipes::connectOtherStreams);
Expand Down

0 comments on commit 7cbbcbf

Please sign in to comment.