Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Communication mechanism for js #289

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
7b23218
Job Details from Extension for JS
vibrantvarun Dec 27, 2022
ecb3462
Communication Mechanism for JS
vibrantvarun Dec 28, 2022
692d659
Communication mechanism for JS
vibrantvarun Dec 29, 2022
97e783b
Communication mechanism for JS
vibrantvarun Dec 29, 2022
b97d08a
Communication mechanism for JS
vibrantvarun Dec 29, 2022
92e5a15
Communication mechanism for JS
vibrantvarun Dec 29, 2022
da64ca7
Commnunication Mechanism for JS
vibrantvarun Dec 29, 2022
5eb8dc5
Commnunication Mechanism for JS
vibrantvarun Dec 29, 2022
21dfd75
Commnunication Mechanism for JS
vibrantvarun Dec 30, 2022
15e9e2e
Commnunication Mechanism for JS
vibrantvarun Dec 30, 2022
c9933e6
Commnunication Mechanism for JS
vibrantvarun Dec 30, 2022
ff9e493
Commnunication Mechanism for JS
vibrantvarun Dec 30, 2022
7933c4e
Commnunication Mechanism for JS
vibrantvarun Dec 30, 2022
37c8d0d
Communication Mechanism for JS
vibrantvarun Jan 3, 2023
7879155
Commnunication Work for JS
vibrantvarun Jan 3, 2023
1367c02
Communication Mechanism for JS
vibrantvarun Jan 3, 2023
a27c263
Communication Mechanism for JS
vibrantvarun Jan 5, 2023
f4e36f0
Communication Mechanism for JS
vibrantvarun Jan 5, 2023
0b62209
Communication Mechanism for JS
vibrantvarun Jan 5, 2023
fac7c69
Communication Mechanism for JS
vibrantvarun Jan 5, 2023
1494680
Communication Mechanism for JS
vibrantvarun Jan 5, 2023
dbc9382
Communication Mechanism for JS
vibrantvarun Jan 5, 2023
6796447
Communication Mechanism for JS
vibrantvarun Jan 5, 2023
eb1ccb6
Merge branch 'main' of https://github.com/opensearch-project/job-sche…
vibrantvarun Jan 5, 2023
3ef8a3c
Merge branch 'main' into Communication_Mechanism_For_JS
vibrantvarun Jan 5, 2023
b726e5c
Communication Mechanism for JS
vibrantvarun Jan 5, 2023
a123089
Communication Mechanism for JS
vibrantvarun Jan 5, 2023
d6bea89
Communication Mechanism for JS
vibrantvarun Jan 5, 2023
3613ec8
Communication Mechanism for JS
vibrantvarun Jan 6, 2023
9a881dc
Communication Mechanism for JS
vibrantvarun Jan 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ repositories {

dependencies {
implementation project(path: ":${rootProject.name}-spi", configuration: 'shadow')
implementation group: 'com.google.guava', name: 'guava', version:'31.0.1-jre'
implementation group: 'com.google.guava', name: 'failureaccess', version:'1.0.1'
javaRestTestImplementation project.sourceSets.main.runtimeClasspath
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@
*/
package org.opensearch.jobscheduler;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionResponse;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.jobscheduler.model.JobDetails;
import org.opensearch.jobscheduler.rest.RestGetJobIndexAction;
import org.opensearch.jobscheduler.rest.RestGetJobTypeAction;
import org.opensearch.jobscheduler.scheduler.JobScheduler;
import org.opensearch.jobscheduler.spi.JobSchedulerExtension;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
Expand All @@ -23,35 +34,43 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.ParseField;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexModule;
import org.opensearch.jobscheduler.transport.GetJobIndexAction;
import org.opensearch.jobscheduler.transport.GetJobIndexTransportAction;
import org.opensearch.jobscheduler.transport.GetJobTypeAction;
import org.opensearch.jobscheduler.transport.GetJobTypeTransportAction;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.ExtensiblePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.function.Supplier;

public class JobSchedulerPlugin extends Plugin implements ExtensiblePlugin {
import com.google.common.collect.ImmutableList;

public class JobSchedulerPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin {

public static final String OPEN_DISTRO_JOB_SCHEDULER_THREAD_POOL_NAME = "open_distro_job_scheduler";
public static final String JS_BASE_URI = "/_plugins/_job_scheduler";

private static final Logger log = LogManager.getLogger(JobSchedulerPlugin.class);

Expand All @@ -60,10 +79,12 @@ public class JobSchedulerPlugin extends Plugin implements ExtensiblePlugin {
private LockService lockService;
private Map<String, ScheduledJobProvider> indexToJobProviders;
private Set<String> indicesToListen;
private Map<String, JobDetails> indexToJobDetails;

public JobSchedulerPlugin() {
this.indicesToListen = new HashSet<>();
this.indexToJobProviders = new HashMap<>();
this.indexToJobDetails = new HashMap<>();
}

@Override
Expand Down Expand Up @@ -185,4 +206,27 @@ private JobSweeper initSweeper(
) {
return new JobSweeper(settings, client, clusterService, threadPool, registry, this.indexToJobProviders, scheduler, lockService);
}

@Override
public List getRestHandlers(
Settings settings,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
RestGetJobIndexAction restGetJobIndexAction = new RestGetJobIndexAction(indexToJobDetails);
RestGetJobTypeAction restGetJobTypeAction = new RestGetJobTypeAction(indexToJobDetails);
return ImmutableList.of(restGetJobIndexAction, restGetJobTypeAction);
}

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Arrays.asList(
new ActionHandler<>(GetJobIndexAction.INSTANCE, GetJobIndexTransportAction.class),
new ActionHandler<>(GetJobTypeAction.INSTANCE, GetJobTypeTransportAction.class)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.jobscheduler.constant;

public class CommonValue {

public static String EXTERNAL_ACTION_PREFIX = "cluster:admin/opendistro/js/";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why opendistro?

Copy link
Member Author

@vibrantvarun vibrantvarun Dec 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I am changing it to plugins. The reason to change as suggested by @saratvemulapalli is

opendistro is part of ODFE: https://opendistro.github.io/for-elasticsearch/ which end of life.
AD security features were built in ODFE which is why it has legacy code

}
176 changes: 176 additions & 0 deletions src/main/java/org/opensearch/jobscheduler/model/JobDetails.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.jobscheduler.model;

import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;

import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

/**
* This model class stores the job details of the extension.
*/
public class JobDetails implements ToXContentObject {
joshpalis marked this conversation as resolved.
Show resolved Hide resolved

/**
* jobIndex from the extension.
*/
private String jobIndex;

/**
* jobType from the extension.
*/
private String jobType;

/**
* jobParser action to trigger the response back to the extension.
*/
private String jobParserAction;

/**
* jobRunner action to trigger the response back to the extension.
*/
private String jobRunnerAction;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need jobRunner action and jobParser action?
Isn't there only one callback?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be 2 callback methods. One to retrieve ScheduleJobParser and other would be ScheduleJobRunner

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm why though, could you help me understand the flow? May be im missing the pieces to pull the puzzle together?

Copy link
Member Author

@vibrantvarun vibrantvarun Dec 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In JobSweeper. java sweep method

                    XContentParser parser = XContentHelper.createParser(
                        this.xContentRegistry,
                        LoggingDeprecationHandler.INSTANCE,
                        jobSource,
                        XContentType.JSON
                    );
                    ScheduledJobParameter jobParameter = provider.getJobParser().parse(parser, docId, jobDocVersion);
                    if (jobParameter == null) {
                        // allow parser to return null, which means this is not a scheduled job document.
                        return null;
                    }
                    ScheduledJobRunner jobRunner = this.indexToProviders.get(shardId.getIndexName()).getJobRunner();
                    if (jobParameter.isEnabled()) {
                        this.scheduler.schedule(shardId.getIndexName(), docId, jobParameter, jobRunner, jobDocVersion, jitterLimit);
                    }
                    return jobDocVersion;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@saratvemulapalli We'll need to transfer the bytesReference (jobDocVersion) to AD to parse the ScheduledJobParameter (AnomalyDetectorJob) before we are able to schedule the job and invoke the Job Runner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With extensions, the parser is not really passed to JS to run, probably lets name it get job param as an API as we discussed offline.


public static final String JOB_INDEX = "job_index";
public static final String JOB_TYPE = "job_type";
public static final String JOB_PARSER_ACTION = "job_parser_action";
public static final String JOB_RUNNER_ACTION = "job_runner_action";

public JobDetails() {}

public JobDetails(String jobIndex, String jobType, String jobParserAction, String jobRunnerAction) {
this.jobIndex = jobIndex;
this.jobType = jobType;
this.jobParserAction = jobParserAction;
this.jobRunnerAction = jobRunnerAction;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();
if (jobIndex != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand, a. do we need to check if they are null?
b. And would they be ever null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we do have to check if they are null or not as while testing if we provide no value then it should handle that test case gracefully.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the api call will be made then the job index will never be null.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ACK

xContentBuilder.field(JOB_INDEX, jobIndex);
}
if (jobType != null) {
xContentBuilder.field(JOB_TYPE, jobType);
}
if (jobParserAction != null) {
xContentBuilder.field(JOB_PARSER_ACTION, jobParserAction);
}
if (jobRunnerAction != null) {
xContentBuilder.field(JOB_RUNNER_ACTION, jobRunnerAction);
}
return xContentBuilder.endObject();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nulls are dangerous and usually best avoided. Personal preference is to try to use Optional instead. I can be convinced to stick with null but will ask for it to be amply documented in that case.

Looking at all the classes in this PR, it seems that we always require jobType to be non-null, but the other values are allowed to be null. That isn't obvious from looking at this class in isolation. I suggest:

  1. Test the value of jobType before returning here, and throw an appropriate parsing exception here if it is null.
  2. Add the @Nullable annotation to the getters for the other three fields.
  3. Add the @Nullable annotation to each of the (allowed to be null) parameters on the constructor.
  4. Add a javadoc somewhere (perhaps the class itself, or this parsing method) indicating the allowed/disallowed nulls.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

public static JobDetails parse(XContentParser parser) throws IOException {
String jobIndex = null;
String jobType = null;
String jobParserAction = null;
String jobRunnerAction = null;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);

while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case JOB_INDEX:
jobIndex = parser.text();
break;
case JOB_TYPE:
jobType = parser.text();
break;
case JOB_PARSER_ACTION:
jobParserAction = parser.text();
break;
case JOB_RUNNER_ACTION:
jobRunnerAction = parser.text();
break;
default:
parser.skipChildren();
break;
}
}

return new JobDetails(jobIndex, jobType, jobParserAction, jobRunnerAction);
}

public String getJobIndex() {
return jobIndex;
}

public void setJobIndex(String jobIndex) {
this.jobIndex = jobIndex;
}

public String getJobType() {
return jobType;
}

public void setJobType(String jobType) {
this.jobType = jobType;
}

public String getJobParserAction() {
return jobParserAction;
}

public void setJobParserAction(String jobParserAction) {
this.jobParserAction = jobParserAction;
}

public String getJobRunnerAction() {
return jobRunnerAction;
}

public void setJobRunnerAction(String jobRunnerAction) {
this.jobRunnerAction = jobRunnerAction;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
JobDetails that = (JobDetails) o;
return Objects.equals(jobIndex, that.jobIndex)
&& Objects.equals(jobType, that.jobType)
&& Objects.equals(jobParserAction, that.jobParserAction)
&& Objects.equals(jobRunnerAction, that.jobRunnerAction);
}

@Override
public int hashCode() {
return Objects.hash(jobIndex, jobType, jobParserAction, jobRunnerAction);
}

@Override
public String toString() {
return "JobDetails{"
+ "jobIndex='"
+ jobIndex
+ '\''
+ ", jobType='"
+ jobType
+ '\''
+ ", jobParserAction='"
+ jobParserAction
+ '\''
+ ", jobRunnerAction='"
+ jobRunnerAction
+ '\''
+ '}';
}
}
Loading